multifilesystem.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847
  1. # This file is part of Radicale Server - Calendar Server
  2. # Copyright © 2014 Jean-Marc Martins
  3. # Copyright © 2012-2017 Guillaume Ayoub
  4. # Copyright © 2017-2018 Unrud<unrud@outlook.com>
  5. #
  6. # This library is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This library is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  18. import binascii
  19. import contextlib
  20. import json
  21. import logging
  22. import os
  23. import pickle
  24. import posixpath
  25. import shlex
  26. import subprocess
  27. import time
  28. from contextlib import contextmanager
  29. from hashlib import md5
  30. from itertools import chain
  31. from tempfile import NamedTemporaryFile, TemporaryDirectory
  32. import vobject
  33. from radicale import item as radicale_item
  34. from radicale import pathutils, storage
  35. from radicale.item import filter as radicale_filter
  36. from radicale.log import logger
  37. class Collection(storage.BaseCollection):
  38. """Collection stored in several files per calendar."""
  39. @classmethod
  40. def static_init(cls):
  41. # init storage lock
  42. folder = os.path.expanduser(cls.configuration.get(
  43. "storage", "filesystem_folder"))
  44. cls._makedirs_synced(folder)
  45. lock_path = os.path.join(folder, ".Radicale.lock")
  46. cls._lock = pathutils.RwLock(lock_path)
  47. def __init__(self, path, filesystem_path=None):
  48. folder = self._get_collection_root_folder()
  49. # Path should already be sanitized
  50. self.path = pathutils.strip_path(path)
  51. self._encoding = self.configuration.get("encoding", "stock")
  52. if filesystem_path is None:
  53. filesystem_path = pathutils.path_to_filesystem(folder, self.path)
  54. self._filesystem_path = filesystem_path
  55. self._props_path = os.path.join(
  56. self._filesystem_path, ".Radicale.props")
  57. self._meta_cache = None
  58. self._etag_cache = None
  59. self._item_cache_cleaned = False
  60. @classmethod
  61. def _get_collection_root_folder(cls):
  62. filesystem_folder = os.path.expanduser(
  63. cls.configuration.get("storage", "filesystem_folder"))
  64. return os.path.join(filesystem_folder, "collection-root")
  65. @contextmanager
  66. def _atomic_write(self, path, mode="w", newline=None, sync_directory=True,
  67. replace_fn=os.replace):
  68. directory = os.path.dirname(path)
  69. tmp = NamedTemporaryFile(
  70. mode=mode, dir=directory, delete=False, prefix=".Radicale.tmp-",
  71. newline=newline, encoding=None if "b" in mode else self._encoding)
  72. try:
  73. yield tmp
  74. tmp.flush()
  75. try:
  76. self._fsync(tmp.fileno())
  77. except OSError as e:
  78. raise RuntimeError("Fsync'ing file %r failed: %s" %
  79. (path, e)) from e
  80. tmp.close()
  81. replace_fn(tmp.name, path)
  82. except BaseException:
  83. tmp.close()
  84. os.remove(tmp.name)
  85. raise
  86. if sync_directory:
  87. self._sync_directory(directory)
  88. @classmethod
  89. def _fsync(cls, fd):
  90. if cls.configuration.getboolean("internal", "filesystem_fsync"):
  91. pathutils.fsync(fd)
  92. @classmethod
  93. def _sync_directory(cls, path):
  94. """Sync directory to disk.
  95. This only works on POSIX and does nothing on other systems.
  96. """
  97. if not cls.configuration.getboolean("internal", "filesystem_fsync"):
  98. return
  99. if os.name == "posix":
  100. try:
  101. fd = os.open(path, 0)
  102. try:
  103. cls._fsync(fd)
  104. finally:
  105. os.close(fd)
  106. except OSError as e:
  107. raise RuntimeError("Fsync'ing directory %r failed: %s" %
  108. (path, e)) from e
  109. @classmethod
  110. def _makedirs_synced(cls, filesystem_path):
  111. """Recursively create a directory and its parents in a sync'ed way.
  112. This method acts silently when the folder already exists.
  113. """
  114. if os.path.isdir(filesystem_path):
  115. return
  116. parent_filesystem_path = os.path.dirname(filesystem_path)
  117. # Prevent infinite loop
  118. if filesystem_path != parent_filesystem_path:
  119. # Create parent dirs recursively
  120. cls._makedirs_synced(parent_filesystem_path)
  121. # Possible race!
  122. os.makedirs(filesystem_path, exist_ok=True)
  123. cls._sync_directory(parent_filesystem_path)
  124. @classmethod
  125. def discover(cls, path, depth="0", child_context_manager=(
  126. lambda path, href=None: contextlib.ExitStack())):
  127. # Path should already be sanitized
  128. sane_path = pathutils.strip_path(path)
  129. attributes = sane_path.split("/") if sane_path else []
  130. folder = cls._get_collection_root_folder()
  131. # Create the root collection
  132. cls._makedirs_synced(folder)
  133. try:
  134. filesystem_path = pathutils.path_to_filesystem(folder, sane_path)
  135. except ValueError as e:
  136. # Path is unsafe
  137. logger.debug("Unsafe path %r requested from storage: %s",
  138. sane_path, e, exc_info=True)
  139. return
  140. # Check if the path exists and if it leads to a collection or an item
  141. if not os.path.isdir(filesystem_path):
  142. if attributes and os.path.isfile(filesystem_path):
  143. href = attributes.pop()
  144. else:
  145. return
  146. else:
  147. href = None
  148. sane_path = "/".join(attributes)
  149. collection = cls(pathutils.unstrip_path(sane_path, True))
  150. if href:
  151. yield collection._get(href)
  152. return
  153. yield collection
  154. if depth == "0":
  155. return
  156. for href in collection._list():
  157. with child_context_manager(sane_path, href):
  158. yield collection._get(href)
  159. for entry in os.scandir(filesystem_path):
  160. if not entry.is_dir():
  161. continue
  162. href = entry.name
  163. if not pathutils.is_safe_filesystem_path_component(href):
  164. if not href.startswith(".Radicale"):
  165. logger.debug("Skipping collection %r in %r",
  166. href, sane_path)
  167. continue
  168. sane_child_path = posixpath.join(sane_path, href)
  169. child_path = pathutils.unstrip_path(sane_child_path, True)
  170. with child_context_manager(sane_child_path):
  171. yield cls(child_path)
  172. @classmethod
  173. def verify(cls):
  174. item_errors = collection_errors = 0
  175. @contextlib.contextmanager
  176. def exception_cm(sane_path, href=None):
  177. nonlocal item_errors, collection_errors
  178. try:
  179. yield
  180. except Exception as e:
  181. if href:
  182. item_errors += 1
  183. name = "item %r in %r" % (href, sane_path)
  184. else:
  185. collection_errors += 1
  186. name = "collection %r" % sane_path
  187. logger.error("Invalid %s: %s", name, e, exc_info=True)
  188. remaining_sane_paths = [""]
  189. while remaining_sane_paths:
  190. sane_path = remaining_sane_paths.pop(0)
  191. path = pathutils.unstrip_path(sane_path, True)
  192. logger.debug("Verifying collection %r", sane_path)
  193. with exception_cm(sane_path):
  194. saved_item_errors = item_errors
  195. collection = None
  196. uids = set()
  197. has_child_collections = False
  198. for item in cls.discover(path, "1", exception_cm):
  199. if not collection:
  200. collection = item
  201. collection.get_meta()
  202. continue
  203. if isinstance(item, storage.BaseCollection):
  204. has_child_collections = True
  205. remaining_sane_paths.append(item.path)
  206. elif item.uid in uids:
  207. cls.logger.error(
  208. "Invalid item %r in %r: UID conflict %r",
  209. item.href, sane_path, item.uid)
  210. else:
  211. uids.add(item.uid)
  212. logger.debug("Verified item %r in %r",
  213. item.href, sane_path)
  214. if item_errors == saved_item_errors:
  215. collection.sync()
  216. if has_child_collections and collection.get_meta("tag"):
  217. cls.logger.error("Invalid collection %r: %r must not have "
  218. "child collections", sane_path,
  219. collection.get_meta("tag"))
  220. return item_errors == 0 and collection_errors == 0
  221. @classmethod
  222. def create_collection(cls, href, items=None, props=None):
  223. folder = cls._get_collection_root_folder()
  224. # Path should already be sanitized
  225. sane_path = pathutils.strip_path(href)
  226. filesystem_path = pathutils.path_to_filesystem(folder, sane_path)
  227. if not props:
  228. cls._makedirs_synced(filesystem_path)
  229. return cls(pathutils.unstrip_path(sane_path, True))
  230. parent_dir = os.path.dirname(filesystem_path)
  231. cls._makedirs_synced(parent_dir)
  232. # Create a temporary directory with an unsafe name
  233. with TemporaryDirectory(
  234. prefix=".Radicale.tmp-", dir=parent_dir) as tmp_dir:
  235. # The temporary directory itself can't be renamed
  236. tmp_filesystem_path = os.path.join(tmp_dir, "collection")
  237. os.makedirs(tmp_filesystem_path)
  238. self = cls(pathutils.unstrip_path(sane_path, True),
  239. filesystem_path=tmp_filesystem_path)
  240. self.set_meta(props)
  241. if items is not None:
  242. if props.get("tag") == "VCALENDAR":
  243. self._upload_all_nonatomic(items, suffix=".ics")
  244. elif props.get("tag") == "VADDRESSBOOK":
  245. self._upload_all_nonatomic(items, suffix=".vcf")
  246. # This operation is not atomic on the filesystem level but it's
  247. # very unlikely that one rename operations succeeds while the
  248. # other fails or that only one gets written to disk.
  249. if os.path.exists(filesystem_path):
  250. os.rename(filesystem_path, os.path.join(tmp_dir, "delete"))
  251. os.rename(tmp_filesystem_path, filesystem_path)
  252. cls._sync_directory(parent_dir)
  253. return cls(pathutils.unstrip_path(sane_path, True))
  254. def _upload_all_nonatomic(self, items, suffix=""):
  255. """Upload a new set of items.
  256. This takes a list of vobject items and
  257. uploads them nonatomic and without existence checks.
  258. """
  259. cache_folder = os.path.join(self._filesystem_path,
  260. ".Radicale.cache", "item")
  261. self._makedirs_synced(cache_folder)
  262. hrefs = set()
  263. for item in items:
  264. uid = item.uid
  265. try:
  266. cache_content = self._item_cache_content(item)
  267. except Exception as e:
  268. raise ValueError(
  269. "Failed to store item %r in temporary collection %r: %s" %
  270. (uid, self.path, e)) from e
  271. href_candidates = []
  272. if os.name in ("nt", "posix"):
  273. href_candidates.append(
  274. lambda: uid if uid.lower().endswith(suffix.lower())
  275. else uid + suffix)
  276. href_candidates.extend((
  277. lambda: radicale_item.get_etag(uid).strip('"') + suffix,
  278. lambda: radicale_item.find_available_uid(hrefs.__contains__,
  279. suffix)))
  280. href = None
  281. def replace_fn(source, target):
  282. nonlocal href
  283. while href_candidates:
  284. href = href_candidates.pop(0)()
  285. if href in hrefs:
  286. continue
  287. if not pathutils.is_safe_filesystem_path_component(href):
  288. if not href_candidates:
  289. raise pathutils.UnsafePathError(href)
  290. continue
  291. try:
  292. return os.replace(source, pathutils.path_to_filesystem(
  293. self._filesystem_path, href))
  294. except OSError as e:
  295. if href_candidates and (
  296. os.name == "posix" and e.errno == 22 or
  297. os.name == "nt" and e.errno == 123):
  298. continue
  299. raise
  300. with self._atomic_write(os.path.join(self._filesystem_path, "ign"),
  301. newline="", sync_directory=False,
  302. replace_fn=replace_fn) as f:
  303. f.write(item.serialize())
  304. hrefs.add(href)
  305. with self._atomic_write(os.path.join(cache_folder, href), "wb",
  306. sync_directory=False) as f:
  307. pickle.dump(cache_content, f)
  308. self._sync_directory(cache_folder)
  309. self._sync_directory(self._filesystem_path)
  310. @classmethod
  311. def move(cls, item, to_collection, to_href):
  312. if not pathutils.is_safe_filesystem_path_component(to_href):
  313. raise pathutils.UnsafePathError(to_href)
  314. os.replace(
  315. pathutils.path_to_filesystem(
  316. item.collection._filesystem_path, item.href),
  317. pathutils.path_to_filesystem(
  318. to_collection._filesystem_path, to_href))
  319. cls._sync_directory(to_collection._filesystem_path)
  320. if item.collection._filesystem_path != to_collection._filesystem_path:
  321. cls._sync_directory(item.collection._filesystem_path)
  322. # Move the item cache entry
  323. cache_folder = os.path.join(item.collection._filesystem_path,
  324. ".Radicale.cache", "item")
  325. to_cache_folder = os.path.join(to_collection._filesystem_path,
  326. ".Radicale.cache", "item")
  327. cls._makedirs_synced(to_cache_folder)
  328. try:
  329. os.replace(os.path.join(cache_folder, item.href),
  330. os.path.join(to_cache_folder, to_href))
  331. except FileNotFoundError:
  332. pass
  333. else:
  334. cls._makedirs_synced(to_cache_folder)
  335. if cache_folder != to_cache_folder:
  336. cls._makedirs_synced(cache_folder)
  337. # Track the change
  338. to_collection._update_history_etag(to_href, item)
  339. item.collection._update_history_etag(item.href, None)
  340. to_collection._clean_history_cache()
  341. if item.collection._filesystem_path != to_collection._filesystem_path:
  342. item.collection._clean_history_cache()
  343. @classmethod
  344. def _clean_cache(cls, folder, names, max_age=None):
  345. """Delete all ``names`` in ``folder`` that are older than ``max_age``.
  346. """
  347. age_limit = time.time() - max_age if max_age is not None else None
  348. modified = False
  349. for name in names:
  350. if not pathutils.is_safe_filesystem_path_component(name):
  351. continue
  352. if age_limit is not None:
  353. try:
  354. # Race: Another process might have deleted the file.
  355. mtime = os.path.getmtime(os.path.join(folder, name))
  356. except FileNotFoundError:
  357. continue
  358. if mtime > age_limit:
  359. continue
  360. logger.debug("Found expired item in cache: %r", name)
  361. # Race: Another process might have deleted or locked the
  362. # file.
  363. try:
  364. os.remove(os.path.join(folder, name))
  365. except (FileNotFoundError, PermissionError):
  366. continue
  367. modified = True
  368. if modified:
  369. cls._sync_directory(folder)
  370. def _update_history_etag(self, href, item):
  371. """Updates and retrieves the history etag from the history cache.
  372. The history cache contains a file for each current and deleted item
  373. of the collection. These files contain the etag of the item (empty
  374. string for deleted items) and a history etag, which is a hash over
  375. the previous history etag and the etag separated by "/".
  376. """
  377. history_folder = os.path.join(self._filesystem_path,
  378. ".Radicale.cache", "history")
  379. try:
  380. with open(os.path.join(history_folder, href), "rb") as f:
  381. cache_etag, history_etag = pickle.load(f)
  382. except (FileNotFoundError, pickle.UnpicklingError, ValueError) as e:
  383. if isinstance(e, (pickle.UnpicklingError, ValueError)):
  384. logger.warning(
  385. "Failed to load history cache entry %r in %r: %s",
  386. href, self.path, e, exc_info=True)
  387. cache_etag = ""
  388. # Initialize with random data to prevent collisions with cleaned
  389. # expired items.
  390. history_etag = binascii.hexlify(os.urandom(16)).decode("ascii")
  391. etag = item.etag if item else ""
  392. if etag != cache_etag:
  393. self._makedirs_synced(history_folder)
  394. history_etag = radicale_item.get_etag(
  395. history_etag + "/" + etag).strip("\"")
  396. try:
  397. # Race: Other processes might have created and locked the file.
  398. with self._atomic_write(os.path.join(history_folder, href),
  399. "wb") as f:
  400. pickle.dump([etag, history_etag], f)
  401. except PermissionError:
  402. pass
  403. return history_etag
  404. def _get_deleted_history_hrefs(self):
  405. """Returns the hrefs of all deleted items that are still in the
  406. history cache."""
  407. history_folder = os.path.join(self._filesystem_path,
  408. ".Radicale.cache", "history")
  409. try:
  410. for entry in os.scandir(history_folder):
  411. href = entry.name
  412. if not pathutils.is_safe_filesystem_path_component(href):
  413. continue
  414. if os.path.isfile(os.path.join(self._filesystem_path, href)):
  415. continue
  416. yield href
  417. except FileNotFoundError:
  418. pass
  419. def _clean_history_cache(self):
  420. # Delete all expired cache entries of deleted items.
  421. history_folder = os.path.join(self._filesystem_path,
  422. ".Radicale.cache", "history")
  423. self._clean_cache(history_folder, self._get_deleted_history_hrefs(),
  424. max_age=self.configuration.getint(
  425. "storage", "max_sync_token_age"))
  426. def sync(self, old_token=None):
  427. # The sync token has the form http://radicale.org/ns/sync/TOKEN_NAME
  428. # where TOKEN_NAME is the md5 hash of all history etags of present and
  429. # past items of the collection.
  430. def check_token_name(token_name):
  431. if len(token_name) != 32:
  432. return False
  433. for c in token_name:
  434. if c not in "0123456789abcdef":
  435. return False
  436. return True
  437. old_token_name = None
  438. if old_token:
  439. # Extract the token name from the sync token
  440. if not old_token.startswith("http://radicale.org/ns/sync/"):
  441. raise ValueError("Malformed token: %r" % old_token)
  442. old_token_name = old_token[len("http://radicale.org/ns/sync/"):]
  443. if not check_token_name(old_token_name):
  444. raise ValueError("Malformed token: %r" % old_token)
  445. # Get the current state and sync-token of the collection.
  446. state = {}
  447. token_name_hash = md5()
  448. # Find the history of all existing and deleted items
  449. for href, item in chain(
  450. ((item.href, item) for item in self.get_all()),
  451. ((href, None) for href in self._get_deleted_history_hrefs())):
  452. history_etag = self._update_history_etag(href, item)
  453. state[href] = history_etag
  454. token_name_hash.update((href + "/" + history_etag).encode("utf-8"))
  455. token_name = token_name_hash.hexdigest()
  456. token = "http://radicale.org/ns/sync/%s" % token_name
  457. if token_name == old_token_name:
  458. # Nothing changed
  459. return token, ()
  460. token_folder = os.path.join(self._filesystem_path,
  461. ".Radicale.cache", "sync-token")
  462. token_path = os.path.join(token_folder, token_name)
  463. old_state = {}
  464. if old_token_name:
  465. # load the old token state
  466. old_token_path = os.path.join(token_folder, old_token_name)
  467. try:
  468. # Race: Another process might have deleted the file.
  469. with open(old_token_path, "rb") as f:
  470. old_state = pickle.load(f)
  471. except (FileNotFoundError, pickle.UnpicklingError,
  472. ValueError) as e:
  473. if isinstance(e, (pickle.UnpicklingError, ValueError)):
  474. logger.warning(
  475. "Failed to load stored sync token %r in %r: %s",
  476. old_token_name, self.path, e, exc_info=True)
  477. # Delete the damaged file
  478. try:
  479. os.remove(old_token_path)
  480. except (FileNotFoundError, PermissionError):
  481. pass
  482. raise ValueError("Token not found: %r" % old_token)
  483. # write the new token state or update the modification time of
  484. # existing token state
  485. if not os.path.exists(token_path):
  486. self._makedirs_synced(token_folder)
  487. try:
  488. # Race: Other processes might have created and locked the file.
  489. with self._atomic_write(token_path, "wb") as f:
  490. pickle.dump(state, f)
  491. except PermissionError:
  492. pass
  493. else:
  494. # clean up old sync tokens and item cache
  495. self._clean_cache(token_folder, os.listdir(token_folder),
  496. max_age=self.configuration.getint(
  497. "storage", "max_sync_token_age"))
  498. self._clean_history_cache()
  499. else:
  500. # Try to update the modification time
  501. try:
  502. # Race: Another process might have deleted the file.
  503. os.utime(token_path)
  504. except FileNotFoundError:
  505. pass
  506. changes = []
  507. # Find all new, changed and deleted (that are still in the item cache)
  508. # items
  509. for href, history_etag in state.items():
  510. if history_etag != old_state.get(href):
  511. changes.append(href)
  512. # Find all deleted items that are no longer in the item cache
  513. for href, history_etag in old_state.items():
  514. if href not in state:
  515. changes.append(href)
  516. return token, changes
  517. def _list(self):
  518. for entry in os.scandir(self._filesystem_path):
  519. if not entry.is_file():
  520. continue
  521. href = entry.name
  522. if not pathutils.is_safe_filesystem_path_component(href):
  523. if not href.startswith(".Radicale"):
  524. logger.debug("Skipping item %r in %r", href, self.path)
  525. continue
  526. yield href
  527. def _item_cache_hash(self, raw_text):
  528. _hash = md5()
  529. _hash.update(storage.CACHE_VERSION)
  530. _hash.update(raw_text)
  531. return _hash.hexdigest()
  532. def _item_cache_content(self, item, cache_hash=None):
  533. text = item.serialize()
  534. if cache_hash is None:
  535. cache_hash = self._item_cache_hash(text.encode(self._encoding))
  536. return (cache_hash, item.uid, item.etag, text, item.name,
  537. item.component_name, *item.time_range)
  538. def _store_item_cache(self, href, item, cache_hash=None):
  539. cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache",
  540. "item")
  541. content = self._item_cache_content(item, cache_hash)
  542. self._makedirs_synced(cache_folder)
  543. try:
  544. # Race: Other processes might have created and locked the
  545. # file.
  546. with self._atomic_write(os.path.join(cache_folder, href),
  547. "wb") as f:
  548. pickle.dump(content, f)
  549. except PermissionError:
  550. pass
  551. return content
  552. def _acquire_cache_lock(self, ns=""):
  553. if self._lock.locked == "w":
  554. return contextlib.ExitStack()
  555. cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache")
  556. self._makedirs_synced(cache_folder)
  557. lock_path = os.path.join(cache_folder,
  558. ".Radicale.lock" + (".%s" % ns if ns else ""))
  559. lock = pathutils.RwLock(lock_path)
  560. return lock.acquire("w")
  561. def _load_item_cache(self, href, input_hash):
  562. cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache",
  563. "item")
  564. cache_hash = uid = etag = text = name = tag = start = end = None
  565. try:
  566. with open(os.path.join(cache_folder, href), "rb") as f:
  567. cache_hash, *content = pickle.load(f)
  568. if cache_hash == input_hash:
  569. uid, etag, text, name, tag, start, end = content
  570. except FileNotFoundError as e:
  571. pass
  572. except (pickle.UnpicklingError, ValueError) as e:
  573. logger.warning("Failed to load item cache entry %r in %r: %s",
  574. href, self.path, e, exc_info=True)
  575. return cache_hash, uid, etag, text, name, tag, start, end
  576. def _clean_item_cache(self):
  577. cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache",
  578. "item")
  579. self._clean_cache(cache_folder, (
  580. e.name for e in os.scandir(cache_folder) if not
  581. os.path.isfile(os.path.join(self._filesystem_path, e.name))))
  582. def _get(self, href, verify_href=True):
  583. if verify_href:
  584. try:
  585. if not pathutils.is_safe_filesystem_path_component(href):
  586. raise pathutils.UnsafePathError(href)
  587. path = pathutils.path_to_filesystem(
  588. self._filesystem_path, href)
  589. except ValueError as e:
  590. logger.debug(
  591. "Can't translate name %r safely to filesystem in %r: %s",
  592. href, self.path, e, exc_info=True)
  593. return None
  594. else:
  595. path = os.path.join(self._filesystem_path, href)
  596. try:
  597. with open(path, "rb") as f:
  598. raw_text = f.read()
  599. except (FileNotFoundError, IsADirectoryError):
  600. return None
  601. except PermissionError:
  602. # Windows raises ``PermissionError`` when ``path`` is a directory
  603. if (os.name == "nt" and
  604. os.path.isdir(path) and os.access(path, os.R_OK)):
  605. return None
  606. raise
  607. # The hash of the component in the file system. This is used to check,
  608. # if the entry in the cache is still valid.
  609. input_hash = self._item_cache_hash(raw_text)
  610. cache_hash, uid, etag, text, name, tag, start, end = \
  611. self._load_item_cache(href, input_hash)
  612. if input_hash != cache_hash:
  613. with self._acquire_cache_lock("item"):
  614. # Lock the item cache to prevent multpile processes from
  615. # generating the same data in parallel.
  616. # This improves the performance for multiple requests.
  617. if self._lock.locked == "r":
  618. # Check if another process created the file in the meantime
  619. cache_hash, uid, etag, text, name, tag, start, end = \
  620. self._load_item_cache(href, input_hash)
  621. if input_hash != cache_hash:
  622. try:
  623. vobject_items = tuple(vobject.readComponents(
  624. raw_text.decode(self._encoding)))
  625. radicale_item.check_and_sanitize_items(
  626. vobject_items, tag=self.get_meta("tag"))
  627. vobject_item, = vobject_items
  628. temp_item = radicale_item.Item(
  629. collection=self, vobject_item=vobject_item)
  630. cache_hash, uid, etag, text, name, tag, start, end = \
  631. self._store_item_cache(
  632. href, temp_item, input_hash)
  633. except Exception as e:
  634. raise RuntimeError("Failed to load item %r in %r: %s" %
  635. (href, self.path, e)) from e
  636. # Clean cache entries once after the data in the file
  637. # system was edited externally.
  638. if not self._item_cache_cleaned:
  639. self._item_cache_cleaned = True
  640. self._clean_item_cache()
  641. last_modified = time.strftime(
  642. "%a, %d %b %Y %H:%M:%S GMT",
  643. time.gmtime(os.path.getmtime(path)))
  644. # Don't keep reference to ``vobject_item``, because it requires a lot
  645. # of memory.
  646. return radicale_item.Item(
  647. collection=self, href=href, last_modified=last_modified, etag=etag,
  648. text=text, uid=uid, name=name, component_name=tag,
  649. time_range=(start, end))
  650. def get_multi(self, hrefs):
  651. # It's faster to check for file name collissions here, because
  652. # we only need to call os.listdir once.
  653. files = None
  654. for href in hrefs:
  655. if files is None:
  656. # List dir after hrefs returned one item, the iterator may be
  657. # empty and the for-loop is never executed.
  658. files = os.listdir(self._filesystem_path)
  659. path = os.path.join(self._filesystem_path, href)
  660. if (not pathutils.is_safe_filesystem_path_component(href) or
  661. href not in files and os.path.lexists(path)):
  662. logger.debug(
  663. "Can't translate name safely to filesystem: %r", href)
  664. yield (href, None)
  665. else:
  666. yield (href, self._get(href, verify_href=False))
  667. def get_all(self):
  668. # We don't need to check for collissions, because the the file names
  669. # are from os.listdir.
  670. return (self._get(href, verify_href=False) for href in self._list())
  671. def get_filtered(self, filters):
  672. tag, start, end, simple = radicale_filter.simplify_prefilters(
  673. filters, collection_tag=self.get_meta("tag"))
  674. if not tag:
  675. # no filter
  676. yield from ((item, simple) for item in self.get_all())
  677. return
  678. for item in (self._get(h, verify_href=False) for h in self._list()):
  679. istart, iend = item.time_range
  680. if tag == item.component_name and istart < end and iend > start:
  681. yield item, simple and (start <= istart or iend <= end)
  682. def upload(self, href, item):
  683. if not pathutils.is_safe_filesystem_path_component(href):
  684. raise pathutils.UnsafePathError(href)
  685. try:
  686. self._store_item_cache(href, item)
  687. except Exception as e:
  688. raise ValueError("Failed to store item %r in collection %r: %s" %
  689. (href, self.path, e)) from e
  690. path = pathutils.path_to_filesystem(self._filesystem_path, href)
  691. with self._atomic_write(path, newline="") as fd:
  692. fd.write(item.serialize())
  693. # Clean the cache after the actual item is stored, or the cache entry
  694. # will be removed again.
  695. self._clean_item_cache()
  696. # Track the change
  697. self._update_history_etag(href, item)
  698. self._clean_history_cache()
  699. return self._get(href, verify_href=False)
  700. def delete(self, href=None):
  701. if href is None:
  702. # Delete the collection
  703. parent_dir = os.path.dirname(self._filesystem_path)
  704. try:
  705. os.rmdir(self._filesystem_path)
  706. except OSError:
  707. with TemporaryDirectory(
  708. prefix=".Radicale.tmp-", dir=parent_dir) as tmp:
  709. os.rename(self._filesystem_path, os.path.join(
  710. tmp, os.path.basename(self._filesystem_path)))
  711. self._sync_directory(parent_dir)
  712. else:
  713. self._sync_directory(parent_dir)
  714. else:
  715. # Delete an item
  716. if not pathutils.is_safe_filesystem_path_component(href):
  717. raise pathutils.UnsafePathError(href)
  718. path = pathutils.path_to_filesystem(self._filesystem_path, href)
  719. if not os.path.isfile(path):
  720. raise storage.ComponentNotFoundError(href)
  721. os.remove(path)
  722. self._sync_directory(os.path.dirname(path))
  723. # Track the change
  724. self._update_history_etag(href, None)
  725. self._clean_history_cache()
  726. def get_meta(self, key=None):
  727. # reuse cached value if the storage is read-only
  728. if self._lock.locked == "w" or self._meta_cache is None:
  729. try:
  730. try:
  731. with open(self._props_path, encoding=self._encoding) as f:
  732. self._meta_cache = json.load(f)
  733. except FileNotFoundError:
  734. self._meta_cache = {}
  735. radicale_item.check_and_sanitize_props(self._meta_cache)
  736. except ValueError as e:
  737. raise RuntimeError("Failed to load properties of collection "
  738. "%r: %s" % (self.path, e)) from e
  739. return self._meta_cache.get(key) if key else self._meta_cache
  740. def set_meta(self, props):
  741. with self._atomic_write(self._props_path, "w") as f:
  742. json.dump(props, f, sort_keys=True)
  743. @property
  744. def last_modified(self):
  745. relevant_files = chain(
  746. (self._filesystem_path,),
  747. (self._props_path,) if os.path.exists(self._props_path) else (),
  748. (os.path.join(self._filesystem_path, h) for h in self._list()))
  749. last = max(map(os.path.getmtime, relevant_files))
  750. return time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(last))
  751. @property
  752. def etag(self):
  753. # reuse cached value if the storage is read-only
  754. if self._lock.locked == "w" or self._etag_cache is None:
  755. self._etag_cache = super().etag
  756. return self._etag_cache
  757. @classmethod
  758. @contextmanager
  759. def acquire_lock(cls, mode, user=None):
  760. with cls._lock.acquire(mode):
  761. yield
  762. # execute hook
  763. hook = cls.configuration.get("storage", "hook")
  764. if mode == "w" and hook:
  765. folder = os.path.expanduser(cls.configuration.get(
  766. "storage", "filesystem_folder"))
  767. logger.debug("Running hook")
  768. debug = logger.isEnabledFor(logging.DEBUG)
  769. p = subprocess.Popen(
  770. hook % {"user": shlex.quote(user or "Anonymous")},
  771. stdin=subprocess.DEVNULL,
  772. stdout=subprocess.PIPE if debug else subprocess.DEVNULL,
  773. stderr=subprocess.PIPE if debug else subprocess.DEVNULL,
  774. shell=True, universal_newlines=True, cwd=folder)
  775. stdout_data, stderr_data = p.communicate()
  776. if stdout_data:
  777. logger.debug("Captured stdout hook:\n%s", stdout_data)
  778. if stderr_data:
  779. logger.debug("Captured stderr hook:\n%s", stderr_data)
  780. if p.returncode != 0:
  781. raise subprocess.CalledProcessError(p.returncode, p.args)