multifilesystem.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834
  1. # This file is part of Radicale Server - Calendar Server
  2. # Copyright © 2014 Jean-Marc Martins
  3. # Copyright © 2012-2017 Guillaume Ayoub
  4. # Copyright © 2017-2018 Unrud<unrud@outlook.com>
  5. #
  6. # This library is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This library is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  18. import binascii
  19. import contextlib
  20. import json
  21. import logging
  22. import os
  23. import pickle
  24. import posixpath
  25. import shlex
  26. import subprocess
  27. import time
  28. from contextlib import contextmanager
  29. from hashlib import md5
  30. from itertools import chain
  31. from tempfile import NamedTemporaryFile, TemporaryDirectory
  32. import vobject
  33. from radicale import item as radicale_item
  34. from radicale import pathutils, storage
  35. from radicale.log import logger
  36. class Collection(storage.BaseCollection):
  37. """Collection stored in several files per calendar."""
  38. @classmethod
  39. def static_init(cls):
  40. # init storage lock
  41. folder = os.path.expanduser(cls.configuration.get(
  42. "storage", "filesystem_folder"))
  43. cls._makedirs_synced(folder)
  44. lock_path = os.path.join(folder, ".Radicale.lock")
  45. cls._lock = pathutils.RwLock(lock_path)
  46. def __init__(self, path, filesystem_path=None):
  47. folder = self._get_collection_root_folder()
  48. # Path should already be sanitized
  49. self.path = pathutils.strip_path(path)
  50. self._encoding = self.configuration.get("encoding", "stock")
  51. if filesystem_path is None:
  52. filesystem_path = pathutils.path_to_filesystem(folder, self.path)
  53. self._filesystem_path = filesystem_path
  54. self._props_path = os.path.join(
  55. self._filesystem_path, ".Radicale.props")
  56. self._meta_cache = None
  57. self._etag_cache = None
  58. self._item_cache_cleaned = False
  59. @classmethod
  60. def _get_collection_root_folder(cls):
  61. filesystem_folder = os.path.expanduser(
  62. cls.configuration.get("storage", "filesystem_folder"))
  63. return os.path.join(filesystem_folder, "collection-root")
  64. @contextmanager
  65. def _atomic_write(self, path, mode="w", newline=None, sync_directory=True,
  66. replace_fn=os.replace):
  67. directory = os.path.dirname(path)
  68. tmp = NamedTemporaryFile(
  69. mode=mode, dir=directory, delete=False, prefix=".Radicale.tmp-",
  70. newline=newline, encoding=None if "b" in mode else self._encoding)
  71. try:
  72. yield tmp
  73. tmp.flush()
  74. try:
  75. self._fsync(tmp.fileno())
  76. except OSError as e:
  77. raise RuntimeError("Fsync'ing file %r failed: %s" %
  78. (path, e)) from e
  79. tmp.close()
  80. replace_fn(tmp.name, path)
  81. except BaseException:
  82. tmp.close()
  83. os.remove(tmp.name)
  84. raise
  85. if sync_directory:
  86. self._sync_directory(directory)
  87. @classmethod
  88. def _fsync(cls, fd):
  89. if cls.configuration.getboolean("internal", "filesystem_fsync"):
  90. pathutils.fsync(fd)
  91. @classmethod
  92. def _sync_directory(cls, path):
  93. """Sync directory to disk.
  94. This only works on POSIX and does nothing on other systems.
  95. """
  96. if not cls.configuration.getboolean("internal", "filesystem_fsync"):
  97. return
  98. if os.name == "posix":
  99. try:
  100. fd = os.open(path, 0)
  101. try:
  102. cls._fsync(fd)
  103. finally:
  104. os.close(fd)
  105. except OSError as e:
  106. raise RuntimeError("Fsync'ing directory %r failed: %s" %
  107. (path, e)) from e
  108. @classmethod
  109. def _makedirs_synced(cls, filesystem_path):
  110. """Recursively create a directory and its parents in a sync'ed way.
  111. This method acts silently when the folder already exists.
  112. """
  113. if os.path.isdir(filesystem_path):
  114. return
  115. parent_filesystem_path = os.path.dirname(filesystem_path)
  116. # Prevent infinite loop
  117. if filesystem_path != parent_filesystem_path:
  118. # Create parent dirs recursively
  119. cls._makedirs_synced(parent_filesystem_path)
  120. # Possible race!
  121. os.makedirs(filesystem_path, exist_ok=True)
  122. cls._sync_directory(parent_filesystem_path)
  123. @classmethod
  124. def discover(cls, path, depth="0", child_context_manager=(
  125. lambda path, href=None: contextlib.ExitStack())):
  126. # Path should already be sanitized
  127. sane_path = pathutils.strip_path(path)
  128. attributes = sane_path.split("/") if sane_path else []
  129. folder = cls._get_collection_root_folder()
  130. # Create the root collection
  131. cls._makedirs_synced(folder)
  132. try:
  133. filesystem_path = pathutils.path_to_filesystem(folder, sane_path)
  134. except ValueError as e:
  135. # Path is unsafe
  136. logger.debug("Unsafe path %r requested from storage: %s",
  137. sane_path, e, exc_info=True)
  138. return
  139. # Check if the path exists and if it leads to a collection or an item
  140. if not os.path.isdir(filesystem_path):
  141. if attributes and os.path.isfile(filesystem_path):
  142. href = attributes.pop()
  143. else:
  144. return
  145. else:
  146. href = None
  147. sane_path = "/".join(attributes)
  148. collection = cls(pathutils.unstrip_path(sane_path, True))
  149. if href:
  150. yield collection._get(href)
  151. return
  152. yield collection
  153. if depth == "0":
  154. return
  155. for href in collection._list():
  156. with child_context_manager(sane_path, href):
  157. yield collection._get(href)
  158. for entry in os.scandir(filesystem_path):
  159. if not entry.is_dir():
  160. continue
  161. href = entry.name
  162. if not pathutils.is_safe_filesystem_path_component(href):
  163. if not href.startswith(".Radicale"):
  164. logger.debug("Skipping collection %r in %r",
  165. href, sane_path)
  166. continue
  167. sane_child_path = posixpath.join(sane_path, href)
  168. child_path = pathutils.unstrip_path(sane_child_path, True)
  169. with child_context_manager(sane_child_path):
  170. yield cls(child_path)
  171. @classmethod
  172. def verify(cls):
  173. item_errors = collection_errors = 0
  174. @contextlib.contextmanager
  175. def exception_cm(sane_path, href=None):
  176. nonlocal item_errors, collection_errors
  177. try:
  178. yield
  179. except Exception as e:
  180. if href:
  181. item_errors += 1
  182. name = "item %r in %r" % (href, sane_path)
  183. else:
  184. collection_errors += 1
  185. name = "collection %r" % sane_path
  186. logger.error("Invalid %s: %s", name, e, exc_info=True)
  187. remaining_sane_paths = [""]
  188. while remaining_sane_paths:
  189. sane_path = remaining_sane_paths.pop(0)
  190. path = pathutils.unstrip_path(sane_path, True)
  191. logger.debug("Verifying collection %r", sane_path)
  192. with exception_cm(sane_path):
  193. saved_item_errors = item_errors
  194. collection = None
  195. uids = set()
  196. has_child_collections = False
  197. for item in cls.discover(path, "1", exception_cm):
  198. if not collection:
  199. collection = item
  200. collection.get_meta()
  201. continue
  202. if isinstance(item, storage.BaseCollection):
  203. has_child_collections = True
  204. remaining_sane_paths.append(item.path)
  205. elif item.uid in uids:
  206. cls.logger.error(
  207. "Invalid item %r in %r: UID conflict %r",
  208. item.href, sane_path, item.uid)
  209. else:
  210. uids.add(item.uid)
  211. logger.debug("Verified item %r in %r",
  212. item.href, sane_path)
  213. if item_errors == saved_item_errors:
  214. collection.sync()
  215. if has_child_collections and collection.get_meta("tag"):
  216. cls.logger.error("Invalid collection %r: %r must not have "
  217. "child collections", sane_path,
  218. collection.get_meta("tag"))
  219. return item_errors == 0 and collection_errors == 0
  220. @classmethod
  221. def create_collection(cls, href, items=None, props=None):
  222. folder = cls._get_collection_root_folder()
  223. # Path should already be sanitized
  224. sane_path = pathutils.strip_path(href)
  225. filesystem_path = pathutils.path_to_filesystem(folder, sane_path)
  226. if not props:
  227. cls._makedirs_synced(filesystem_path)
  228. return cls(pathutils.unstrip_path(sane_path, True))
  229. parent_dir = os.path.dirname(filesystem_path)
  230. cls._makedirs_synced(parent_dir)
  231. # Create a temporary directory with an unsafe name
  232. with TemporaryDirectory(
  233. prefix=".Radicale.tmp-", dir=parent_dir) as tmp_dir:
  234. # The temporary directory itself can't be renamed
  235. tmp_filesystem_path = os.path.join(tmp_dir, "collection")
  236. os.makedirs(tmp_filesystem_path)
  237. self = cls(pathutils.unstrip_path(sane_path, True),
  238. filesystem_path=tmp_filesystem_path)
  239. self.set_meta(props)
  240. if items is not None:
  241. if props.get("tag") == "VCALENDAR":
  242. self._upload_all_nonatomic(items, suffix=".ics")
  243. elif props.get("tag") == "VADDRESSBOOK":
  244. self._upload_all_nonatomic(items, suffix=".vcf")
  245. # This operation is not atomic on the filesystem level but it's
  246. # very unlikely that one rename operations succeeds while the
  247. # other fails or that only one gets written to disk.
  248. if os.path.exists(filesystem_path):
  249. os.rename(filesystem_path, os.path.join(tmp_dir, "delete"))
  250. os.rename(tmp_filesystem_path, filesystem_path)
  251. cls._sync_directory(parent_dir)
  252. return cls(pathutils.unstrip_path(sane_path, True))
  253. def _upload_all_nonatomic(self, items, suffix=""):
  254. """Upload a new set of items.
  255. This takes a list of vobject items and
  256. uploads them nonatomic and without existence checks.
  257. """
  258. cache_folder = os.path.join(self._filesystem_path,
  259. ".Radicale.cache", "item")
  260. self._makedirs_synced(cache_folder)
  261. hrefs = set()
  262. for item in items:
  263. uid = item.uid
  264. try:
  265. cache_content = self._item_cache_content(item)
  266. except Exception as e:
  267. raise ValueError(
  268. "Failed to store item %r in temporary collection %r: %s" %
  269. (uid, self.path, e)) from e
  270. href_candidates = []
  271. if os.name in ("nt", "posix"):
  272. href_candidates.append(
  273. lambda: uid if uid.lower().endswith(suffix.lower())
  274. else uid + suffix)
  275. href_candidates.extend((
  276. lambda: radicale_item.get_etag(uid).strip('"') + suffix,
  277. lambda: radicale_item.find_available_uid(hrefs.__contains__,
  278. suffix)))
  279. href = None
  280. def replace_fn(source, target):
  281. nonlocal href
  282. while href_candidates:
  283. href = href_candidates.pop(0)()
  284. if href in hrefs:
  285. continue
  286. if not pathutils.is_safe_filesystem_path_component(href):
  287. if not href_candidates:
  288. raise pathutils.UnsafePathError(href)
  289. continue
  290. try:
  291. return os.replace(source, pathutils.path_to_filesystem(
  292. self._filesystem_path, href))
  293. except OSError as e:
  294. if href_candidates and (
  295. os.name == "posix" and e.errno == 22 or
  296. os.name == "nt" and e.errno == 123):
  297. continue
  298. raise
  299. with self._atomic_write(os.path.join(self._filesystem_path, "ign"),
  300. newline="", sync_directory=False,
  301. replace_fn=replace_fn) as f:
  302. f.write(item.serialize())
  303. hrefs.add(href)
  304. with self._atomic_write(os.path.join(cache_folder, href), "wb",
  305. sync_directory=False) as f:
  306. pickle.dump(cache_content, f)
  307. self._sync_directory(cache_folder)
  308. self._sync_directory(self._filesystem_path)
  309. @classmethod
  310. def move(cls, item, to_collection, to_href):
  311. if not pathutils.is_safe_filesystem_path_component(to_href):
  312. raise pathutils.UnsafePathError(to_href)
  313. os.replace(
  314. pathutils.path_to_filesystem(
  315. item.collection._filesystem_path, item.href),
  316. pathutils.path_to_filesystem(
  317. to_collection._filesystem_path, to_href))
  318. cls._sync_directory(to_collection._filesystem_path)
  319. if item.collection._filesystem_path != to_collection._filesystem_path:
  320. cls._sync_directory(item.collection._filesystem_path)
  321. # Move the item cache entry
  322. cache_folder = os.path.join(item.collection._filesystem_path,
  323. ".Radicale.cache", "item")
  324. to_cache_folder = os.path.join(to_collection._filesystem_path,
  325. ".Radicale.cache", "item")
  326. cls._makedirs_synced(to_cache_folder)
  327. try:
  328. os.replace(os.path.join(cache_folder, item.href),
  329. os.path.join(to_cache_folder, to_href))
  330. except FileNotFoundError:
  331. pass
  332. else:
  333. cls._makedirs_synced(to_cache_folder)
  334. if cache_folder != to_cache_folder:
  335. cls._makedirs_synced(cache_folder)
  336. # Track the change
  337. to_collection._update_history_etag(to_href, item)
  338. item.collection._update_history_etag(item.href, None)
  339. to_collection._clean_history_cache()
  340. if item.collection._filesystem_path != to_collection._filesystem_path:
  341. item.collection._clean_history_cache()
  342. @classmethod
  343. def _clean_cache(cls, folder, names, max_age=None):
  344. """Delete all ``names`` in ``folder`` that are older than ``max_age``.
  345. """
  346. age_limit = time.time() - max_age if max_age is not None else None
  347. modified = False
  348. for name in names:
  349. if not pathutils.is_safe_filesystem_path_component(name):
  350. continue
  351. if age_limit is not None:
  352. try:
  353. # Race: Another process might have deleted the file.
  354. mtime = os.path.getmtime(os.path.join(folder, name))
  355. except FileNotFoundError:
  356. continue
  357. if mtime > age_limit:
  358. continue
  359. logger.debug("Found expired item in cache: %r", name)
  360. # Race: Another process might have deleted or locked the
  361. # file.
  362. try:
  363. os.remove(os.path.join(folder, name))
  364. except (FileNotFoundError, PermissionError):
  365. continue
  366. modified = True
  367. if modified:
  368. cls._sync_directory(folder)
  369. def _update_history_etag(self, href, item):
  370. """Updates and retrieves the history etag from the history cache.
  371. The history cache contains a file for each current and deleted item
  372. of the collection. These files contain the etag of the item (empty
  373. string for deleted items) and a history etag, which is a hash over
  374. the previous history etag and the etag separated by "/".
  375. """
  376. history_folder = os.path.join(self._filesystem_path,
  377. ".Radicale.cache", "history")
  378. try:
  379. with open(os.path.join(history_folder, href), "rb") as f:
  380. cache_etag, history_etag = pickle.load(f)
  381. except (FileNotFoundError, pickle.UnpicklingError, ValueError) as e:
  382. if isinstance(e, (pickle.UnpicklingError, ValueError)):
  383. logger.warning(
  384. "Failed to load history cache entry %r in %r: %s",
  385. href, self.path, e, exc_info=True)
  386. cache_etag = ""
  387. # Initialize with random data to prevent collisions with cleaned
  388. # expired items.
  389. history_etag = binascii.hexlify(os.urandom(16)).decode("ascii")
  390. etag = item.etag if item else ""
  391. if etag != cache_etag:
  392. self._makedirs_synced(history_folder)
  393. history_etag = radicale_item.get_etag(
  394. history_etag + "/" + etag).strip("\"")
  395. try:
  396. # Race: Other processes might have created and locked the file.
  397. with self._atomic_write(os.path.join(history_folder, href),
  398. "wb") as f:
  399. pickle.dump([etag, history_etag], f)
  400. except PermissionError:
  401. pass
  402. return history_etag
  403. def _get_deleted_history_hrefs(self):
  404. """Returns the hrefs of all deleted items that are still in the
  405. history cache."""
  406. history_folder = os.path.join(self._filesystem_path,
  407. ".Radicale.cache", "history")
  408. try:
  409. for entry in os.scandir(history_folder):
  410. href = entry.name
  411. if not pathutils.is_safe_filesystem_path_component(href):
  412. continue
  413. if os.path.isfile(os.path.join(self._filesystem_path, href)):
  414. continue
  415. yield href
  416. except FileNotFoundError:
  417. pass
  418. def _clean_history_cache(self):
  419. # Delete all expired cache entries of deleted items.
  420. history_folder = os.path.join(self._filesystem_path,
  421. ".Radicale.cache", "history")
  422. self._clean_cache(history_folder, self._get_deleted_history_hrefs(),
  423. max_age=self.configuration.getint(
  424. "storage", "max_sync_token_age"))
  425. def sync(self, old_token=None):
  426. # The sync token has the form http://radicale.org/ns/sync/TOKEN_NAME
  427. # where TOKEN_NAME is the md5 hash of all history etags of present and
  428. # past items of the collection.
  429. def check_token_name(token_name):
  430. if len(token_name) != 32:
  431. return False
  432. for c in token_name:
  433. if c not in "0123456789abcdef":
  434. return False
  435. return True
  436. old_token_name = None
  437. if old_token:
  438. # Extract the token name from the sync token
  439. if not old_token.startswith("http://radicale.org/ns/sync/"):
  440. raise ValueError("Malformed token: %r" % old_token)
  441. old_token_name = old_token[len("http://radicale.org/ns/sync/"):]
  442. if not check_token_name(old_token_name):
  443. raise ValueError("Malformed token: %r" % old_token)
  444. # Get the current state and sync-token of the collection.
  445. state = {}
  446. token_name_hash = md5()
  447. # Find the history of all existing and deleted items
  448. for href, item in chain(
  449. ((item.href, item) for item in self.get_all()),
  450. ((href, None) for href in self._get_deleted_history_hrefs())):
  451. history_etag = self._update_history_etag(href, item)
  452. state[href] = history_etag
  453. token_name_hash.update((href + "/" + history_etag).encode("utf-8"))
  454. token_name = token_name_hash.hexdigest()
  455. token = "http://radicale.org/ns/sync/%s" % token_name
  456. if token_name == old_token_name:
  457. # Nothing changed
  458. return token, ()
  459. token_folder = os.path.join(self._filesystem_path,
  460. ".Radicale.cache", "sync-token")
  461. token_path = os.path.join(token_folder, token_name)
  462. old_state = {}
  463. if old_token_name:
  464. # load the old token state
  465. old_token_path = os.path.join(token_folder, old_token_name)
  466. try:
  467. # Race: Another process might have deleted the file.
  468. with open(old_token_path, "rb") as f:
  469. old_state = pickle.load(f)
  470. except (FileNotFoundError, pickle.UnpicklingError,
  471. ValueError) as e:
  472. if isinstance(e, (pickle.UnpicklingError, ValueError)):
  473. logger.warning(
  474. "Failed to load stored sync token %r in %r: %s",
  475. old_token_name, self.path, e, exc_info=True)
  476. # Delete the damaged file
  477. try:
  478. os.remove(old_token_path)
  479. except (FileNotFoundError, PermissionError):
  480. pass
  481. raise ValueError("Token not found: %r" % old_token)
  482. # write the new token state or update the modification time of
  483. # existing token state
  484. if not os.path.exists(token_path):
  485. self._makedirs_synced(token_folder)
  486. try:
  487. # Race: Other processes might have created and locked the file.
  488. with self._atomic_write(token_path, "wb") as f:
  489. pickle.dump(state, f)
  490. except PermissionError:
  491. pass
  492. else:
  493. # clean up old sync tokens and item cache
  494. self._clean_cache(token_folder, os.listdir(token_folder),
  495. max_age=self.configuration.getint(
  496. "storage", "max_sync_token_age"))
  497. self._clean_history_cache()
  498. else:
  499. # Try to update the modification time
  500. try:
  501. # Race: Another process might have deleted the file.
  502. os.utime(token_path)
  503. except FileNotFoundError:
  504. pass
  505. changes = []
  506. # Find all new, changed and deleted (that are still in the item cache)
  507. # items
  508. for href, history_etag in state.items():
  509. if history_etag != old_state.get(href):
  510. changes.append(href)
  511. # Find all deleted items that are no longer in the item cache
  512. for href, history_etag in old_state.items():
  513. if href not in state:
  514. changes.append(href)
  515. return token, changes
  516. def _list(self):
  517. for entry in os.scandir(self._filesystem_path):
  518. if not entry.is_file():
  519. continue
  520. href = entry.name
  521. if not pathutils.is_safe_filesystem_path_component(href):
  522. if not href.startswith(".Radicale"):
  523. logger.debug("Skipping item %r in %r", href, self.path)
  524. continue
  525. yield href
  526. def _item_cache_hash(self, raw_text):
  527. _hash = md5()
  528. _hash.update(storage.CACHE_VERSION)
  529. _hash.update(raw_text)
  530. return _hash.hexdigest()
  531. def _item_cache_content(self, item, cache_hash=None):
  532. text = item.serialize()
  533. if cache_hash is None:
  534. cache_hash = self._item_cache_hash(text.encode(self._encoding))
  535. return (cache_hash, item.uid, item.etag, text, item.name,
  536. item.component_name, *item.time_range)
  537. def _store_item_cache(self, href, item, cache_hash=None):
  538. cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache",
  539. "item")
  540. content = self._item_cache_content(item, cache_hash)
  541. self._makedirs_synced(cache_folder)
  542. try:
  543. # Race: Other processes might have created and locked the
  544. # file.
  545. with self._atomic_write(os.path.join(cache_folder, href),
  546. "wb") as f:
  547. pickle.dump(content, f)
  548. except PermissionError:
  549. pass
  550. return content
  551. def _acquire_cache_lock(self, ns=""):
  552. if self._lock.locked == "w":
  553. return contextlib.ExitStack()
  554. cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache")
  555. self._makedirs_synced(cache_folder)
  556. lock_path = os.path.join(cache_folder,
  557. ".Radicale.lock" + (".%s" % ns if ns else ""))
  558. lock = pathutils.RwLock(lock_path)
  559. return lock.acquire("w")
  560. def _load_item_cache(self, href, input_hash):
  561. cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache",
  562. "item")
  563. cache_hash = uid = etag = text = name = tag = start = end = None
  564. try:
  565. with open(os.path.join(cache_folder, href), "rb") as f:
  566. cache_hash, *content = pickle.load(f)
  567. if cache_hash == input_hash:
  568. uid, etag, text, name, tag, start, end = content
  569. except FileNotFoundError as e:
  570. pass
  571. except (pickle.UnpicklingError, ValueError) as e:
  572. logger.warning("Failed to load item cache entry %r in %r: %s",
  573. href, self.path, e, exc_info=True)
  574. return cache_hash, uid, etag, text, name, tag, start, end
  575. def _clean_item_cache(self):
  576. cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache",
  577. "item")
  578. self._clean_cache(cache_folder, (
  579. e.name for e in os.scandir(cache_folder) if not
  580. os.path.isfile(os.path.join(self._filesystem_path, e.name))))
  581. def _get(self, href, verify_href=True):
  582. if verify_href:
  583. try:
  584. if not pathutils.is_safe_filesystem_path_component(href):
  585. raise pathutils.UnsafePathError(href)
  586. path = pathutils.path_to_filesystem(
  587. self._filesystem_path, href)
  588. except ValueError as e:
  589. logger.debug(
  590. "Can't translate name %r safely to filesystem in %r: %s",
  591. href, self.path, e, exc_info=True)
  592. return None
  593. else:
  594. path = os.path.join(self._filesystem_path, href)
  595. try:
  596. with open(path, "rb") as f:
  597. raw_text = f.read()
  598. except (FileNotFoundError, IsADirectoryError):
  599. return None
  600. except PermissionError:
  601. # Windows raises ``PermissionError`` when ``path`` is a directory
  602. if (os.name == "nt" and
  603. os.path.isdir(path) and os.access(path, os.R_OK)):
  604. return None
  605. raise
  606. # The hash of the component in the file system. This is used to check,
  607. # if the entry in the cache is still valid.
  608. input_hash = self._item_cache_hash(raw_text)
  609. cache_hash, uid, etag, text, name, tag, start, end = \
  610. self._load_item_cache(href, input_hash)
  611. if input_hash != cache_hash:
  612. with self._acquire_cache_lock("item"):
  613. # Lock the item cache to prevent multpile processes from
  614. # generating the same data in parallel.
  615. # This improves the performance for multiple requests.
  616. if self._lock.locked == "r":
  617. # Check if another process created the file in the meantime
  618. cache_hash, uid, etag, text, name, tag, start, end = \
  619. self._load_item_cache(href, input_hash)
  620. if input_hash != cache_hash:
  621. try:
  622. vobject_items = tuple(vobject.readComponents(
  623. raw_text.decode(self._encoding)))
  624. radicale_item.check_and_sanitize_items(
  625. vobject_items, tag=self.get_meta("tag"))
  626. vobject_item, = vobject_items
  627. temp_item = radicale_item.Item(
  628. collection=self, vobject_item=vobject_item)
  629. cache_hash, uid, etag, text, name, tag, start, end = \
  630. self._store_item_cache(
  631. href, temp_item, input_hash)
  632. except Exception as e:
  633. raise RuntimeError("Failed to load item %r in %r: %s" %
  634. (href, self.path, e)) from e
  635. # Clean cache entries once after the data in the file
  636. # system was edited externally.
  637. if not self._item_cache_cleaned:
  638. self._item_cache_cleaned = True
  639. self._clean_item_cache()
  640. last_modified = time.strftime(
  641. "%a, %d %b %Y %H:%M:%S GMT",
  642. time.gmtime(os.path.getmtime(path)))
  643. # Don't keep reference to ``vobject_item``, because it requires a lot
  644. # of memory.
  645. return radicale_item.Item(
  646. collection=self, href=href, last_modified=last_modified, etag=etag,
  647. text=text, uid=uid, name=name, component_name=tag,
  648. time_range=(start, end))
  649. def get_multi(self, hrefs):
  650. # It's faster to check for file name collissions here, because
  651. # we only need to call os.listdir once.
  652. files = None
  653. for href in hrefs:
  654. if files is None:
  655. # List dir after hrefs returned one item, the iterator may be
  656. # empty and the for-loop is never executed.
  657. files = os.listdir(self._filesystem_path)
  658. path = os.path.join(self._filesystem_path, href)
  659. if (not pathutils.is_safe_filesystem_path_component(href) or
  660. href not in files and os.path.lexists(path)):
  661. logger.debug(
  662. "Can't translate name safely to filesystem: %r", href)
  663. yield (href, None)
  664. else:
  665. yield (href, self._get(href, verify_href=False))
  666. def get_all(self):
  667. # We don't need to check for collissions, because the the file names
  668. # are from os.listdir.
  669. return (self._get(href, verify_href=False) for href in self._list())
  670. def upload(self, href, item):
  671. if not pathutils.is_safe_filesystem_path_component(href):
  672. raise pathutils.UnsafePathError(href)
  673. try:
  674. self._store_item_cache(href, item)
  675. except Exception as e:
  676. raise ValueError("Failed to store item %r in collection %r: %s" %
  677. (href, self.path, e)) from e
  678. path = pathutils.path_to_filesystem(self._filesystem_path, href)
  679. with self._atomic_write(path, newline="") as fd:
  680. fd.write(item.serialize())
  681. # Clean the cache after the actual item is stored, or the cache entry
  682. # will be removed again.
  683. self._clean_item_cache()
  684. # Track the change
  685. self._update_history_etag(href, item)
  686. self._clean_history_cache()
  687. return self._get(href, verify_href=False)
  688. def delete(self, href=None):
  689. if href is None:
  690. # Delete the collection
  691. parent_dir = os.path.dirname(self._filesystem_path)
  692. try:
  693. os.rmdir(self._filesystem_path)
  694. except OSError:
  695. with TemporaryDirectory(
  696. prefix=".Radicale.tmp-", dir=parent_dir) as tmp:
  697. os.rename(self._filesystem_path, os.path.join(
  698. tmp, os.path.basename(self._filesystem_path)))
  699. self._sync_directory(parent_dir)
  700. else:
  701. self._sync_directory(parent_dir)
  702. else:
  703. # Delete an item
  704. if not pathutils.is_safe_filesystem_path_component(href):
  705. raise pathutils.UnsafePathError(href)
  706. path = pathutils.path_to_filesystem(self._filesystem_path, href)
  707. if not os.path.isfile(path):
  708. raise storage.ComponentNotFoundError(href)
  709. os.remove(path)
  710. self._sync_directory(os.path.dirname(path))
  711. # Track the change
  712. self._update_history_etag(href, None)
  713. self._clean_history_cache()
  714. def get_meta(self, key=None):
  715. # reuse cached value if the storage is read-only
  716. if self._lock.locked == "w" or self._meta_cache is None:
  717. try:
  718. try:
  719. with open(self._props_path, encoding=self._encoding) as f:
  720. self._meta_cache = json.load(f)
  721. except FileNotFoundError:
  722. self._meta_cache = {}
  723. radicale_item.check_and_sanitize_props(self._meta_cache)
  724. except ValueError as e:
  725. raise RuntimeError("Failed to load properties of collection "
  726. "%r: %s" % (self.path, e)) from e
  727. return self._meta_cache.get(key) if key else self._meta_cache
  728. def set_meta(self, props):
  729. with self._atomic_write(self._props_path, "w") as f:
  730. json.dump(props, f, sort_keys=True)
  731. @property
  732. def last_modified(self):
  733. relevant_files = chain(
  734. (self._filesystem_path,),
  735. (self._props_path,) if os.path.exists(self._props_path) else (),
  736. (os.path.join(self._filesystem_path, h) for h in self._list()))
  737. last = max(map(os.path.getmtime, relevant_files))
  738. return time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(last))
  739. @property
  740. def etag(self):
  741. # reuse cached value if the storage is read-only
  742. if self._lock.locked == "w" or self._etag_cache is None:
  743. self._etag_cache = super().etag
  744. return self._etag_cache
  745. @classmethod
  746. @contextmanager
  747. def acquire_lock(cls, mode, user=None):
  748. with cls._lock.acquire(mode):
  749. yield
  750. # execute hook
  751. hook = cls.configuration.get("storage", "hook")
  752. if mode == "w" and hook:
  753. folder = os.path.expanduser(cls.configuration.get(
  754. "storage", "filesystem_folder"))
  755. logger.debug("Running hook")
  756. debug = logger.isEnabledFor(logging.DEBUG)
  757. p = subprocess.Popen(
  758. hook % {"user": shlex.quote(user or "Anonymous")},
  759. stdin=subprocess.DEVNULL,
  760. stdout=subprocess.PIPE if debug else subprocess.DEVNULL,
  761. stderr=subprocess.PIPE if debug else subprocess.DEVNULL,
  762. shell=True, universal_newlines=True, cwd=folder)
  763. stdout_data, stderr_data = p.communicate()
  764. if stdout_data:
  765. logger.debug("Captured stdout hook:\n%s", stdout_data)
  766. if stderr_data:
  767. logger.debug("Captured stderr hook:\n%s", stderr_data)
  768. if p.returncode != 0:
  769. raise subprocess.CalledProcessError(p.returncode, p.args)