create_collection.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2014 Jean-Marc Martins
  3. # Copyright © 2012-2017 Guillaume Ayoub
  4. # Copyright © 2017-2021 Unrud <unrud@outlook.com>
  5. # Copyright © 2024-2025 Peter Bieringer <pb@bieringer.de>
  6. #
  7. # This library is free software: you can redistribute it and/or modify
  8. # it under the terms of the GNU General Public License as published by
  9. # the Free Software Foundation, either version 3 of the License, or
  10. # (at your option) any later version.
  11. #
  12. # This library is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. # GNU General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU General Public License
  18. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  19. import os
  20. from tempfile import TemporaryDirectory
  21. from typing import Dict, Iterable, List, Optional, Tuple, cast
  22. import radicale.item as radicale_item
  23. from radicale import pathutils
  24. from radicale.log import logger
  25. from radicale.storage import multifilesystem
  26. from radicale.storage.multifilesystem.base import StorageBase
  27. class StoragePartCreateCollection(StorageBase):
  28. def _discover_existing_items_pre_overwrite(self,
  29. tmp_collection: "multifilesystem.Collection",
  30. dst_path: str) -> Tuple[Dict[str, radicale_item.Item], List[str]]:
  31. """Discover existing items in the collection before overwriting them."""
  32. existing_items = {}
  33. new_item_hrefs = []
  34. existing_collection = self._collection_class(
  35. cast(multifilesystem.Storage, self),
  36. pathutils.unstrip_path(dst_path, True))
  37. existing_item_hrefs = set(existing_collection._list())
  38. tmp_collection_hrefs = set(tmp_collection._list())
  39. for item_href in tmp_collection_hrefs:
  40. if item_href not in existing_item_hrefs:
  41. # Item in temporary collection does not exist in the existing collection (is new)
  42. new_item_hrefs.append(item_href)
  43. continue
  44. # Item exists in both collections, grab the existing item for reference
  45. try:
  46. item = existing_collection._get(item_href, verify_href=False)
  47. if item is not None:
  48. existing_items[item_href] = item
  49. except Exception:
  50. # TODO: Log exception?
  51. continue
  52. return existing_items, new_item_hrefs
  53. def create_collection(self, href: str,
  54. items: Optional[Iterable[radicale_item.Item]] = None,
  55. props=None) -> Tuple["multifilesystem.Collection", Dict[str, radicale_item.Item], List[str]]:
  56. folder = self._get_collection_root_folder()
  57. # Path should already be sanitized
  58. sane_path = pathutils.strip_path(href)
  59. filesystem_path = pathutils.path_to_filesystem(folder, sane_path)
  60. logger.debug("Create collection: %r" % filesystem_path)
  61. if not props:
  62. self._makedirs_synced(filesystem_path)
  63. return self._collection_class(
  64. cast(multifilesystem.Storage, self),
  65. pathutils.unstrip_path(sane_path, True)), {}, []
  66. parent_dir = os.path.dirname(filesystem_path)
  67. self._makedirs_synced(parent_dir)
  68. replaced_items: Dict[str, radicale_item.Item] = {}
  69. new_item_hrefs: List[str] = []
  70. # Create a temporary directory with an unsafe name
  71. try:
  72. with TemporaryDirectory(prefix=".Radicale.tmp-", dir=parent_dir
  73. ) as tmp_dir:
  74. # The temporary directory itself can't be renamed
  75. tmp_filesystem_path = os.path.join(tmp_dir, "collection")
  76. os.makedirs(tmp_filesystem_path)
  77. col = self._collection_class(
  78. cast(multifilesystem.Storage, self),
  79. pathutils.unstrip_path(sane_path, True),
  80. filesystem_path=tmp_filesystem_path)
  81. col.set_meta(props)
  82. if items is not None:
  83. if props.get("tag") == "VCALENDAR":
  84. col._upload_all_nonatomic(items, suffix=".ics")
  85. elif props.get("tag") == "VADDRESSBOOK":
  86. col._upload_all_nonatomic(items, suffix=".vcf")
  87. if os.path.lexists(filesystem_path):
  88. replaced_items, new_item_hrefs = self._discover_existing_items_pre_overwrite(
  89. tmp_collection=col,
  90. dst_path=sane_path)
  91. pathutils.rename_exchange(tmp_filesystem_path, filesystem_path)
  92. else:
  93. # If the destination path does not exist, obviously all items are new
  94. new_item_hrefs = list(col._list())
  95. os.rename(tmp_filesystem_path, filesystem_path)
  96. self._sync_directory(parent_dir)
  97. except Exception as e:
  98. raise ValueError("Failed to create collection %r as %r %s" %
  99. (href, filesystem_path, e)) from e
  100. # TODO: Return new-old pairs and just-new items (new vs updated)
  101. return self._collection_class(
  102. cast(multifilesystem.Storage, self),
  103. pathutils.unstrip_path(sane_path, True)), replaced_items, new_item_hrefs