discover.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2014 Jean-Marc Martins
  3. # Copyright © 2012-2017 Guillaume Ayoub
  4. # Copyright © 2017-2018 Unrud <unrud@outlook.com>
  5. #
  6. # This library is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This library is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  18. import base64
  19. import os
  20. import posixpath
  21. from typing import Callable, ContextManager, Iterator, Optional, Set, cast
  22. from radicale import pathutils, types
  23. from radicale.log import logger
  24. from radicale.storage import multifilesystem
  25. from radicale.storage.multifilesystem.base import StorageBase
  26. @types.contextmanager
  27. def _null_child_context_manager(path: str,
  28. href: Optional[str]) -> Iterator[None]:
  29. yield
  30. class StoragePartDiscover(StorageBase):
  31. def discover(
  32. self, path: str, depth: str = "0",
  33. child_context_manager: Optional[
  34. Callable[[str, Optional[str]], ContextManager[None]]] = None,
  35. user_groups: Set[str] = set([])
  36. ) -> Iterator[types.CollectionOrItem]:
  37. # assert isinstance(self, multifilesystem.Storage)
  38. if child_context_manager is None:
  39. child_context_manager = _null_child_context_manager
  40. # Path should already be sanitized
  41. sane_path = pathutils.strip_path(path)
  42. attributes = sane_path.split("/") if sane_path else []
  43. folder = self._get_collection_root_folder()
  44. # Create the root collection
  45. self._makedirs_synced(folder)
  46. try:
  47. filesystem_path = pathutils.path_to_filesystem(folder, sane_path)
  48. except ValueError as e:
  49. # Path is unsafe
  50. logger.debug("Unsafe path %r requested from storage: %s",
  51. sane_path, e, exc_info=True)
  52. return
  53. # Check if the path exists and if it leads to a collection or an item
  54. href: Optional[str]
  55. if not os.path.isdir(filesystem_path):
  56. if attributes and os.path.isfile(filesystem_path):
  57. href = attributes.pop()
  58. else:
  59. return
  60. else:
  61. href = None
  62. sane_path = "/".join(attributes)
  63. collection = self._collection_class(
  64. cast(multifilesystem.Storage, self),
  65. pathutils.unstrip_path(sane_path, True))
  66. if href:
  67. item = collection._get(href)
  68. if item is not None:
  69. yield item
  70. return
  71. yield collection
  72. if depth == "0":
  73. return
  74. for href in collection._list():
  75. with child_context_manager(sane_path, href):
  76. item = collection._get(href)
  77. if item is not None:
  78. yield item
  79. for entry in os.scandir(filesystem_path):
  80. if not entry.is_dir():
  81. continue
  82. href = entry.name
  83. if not pathutils.is_safe_filesystem_path_component(href):
  84. if not href.startswith(".Radicale"):
  85. logger.debug("Skipping collection %r in %r",
  86. href, sane_path)
  87. continue
  88. sane_child_path = posixpath.join(sane_path, href)
  89. child_path = pathutils.unstrip_path(sane_child_path, True)
  90. with child_context_manager(sane_child_path, None):
  91. yield self._collection_class(
  92. cast(multifilesystem.Storage, self), child_path)
  93. for group in user_groups:
  94. href = base64.b64encode(group.encode('utf-8')).decode('ascii')
  95. logger.debug(f"searching for group calendar {group} {href}")
  96. sane_child_path = f"GROUPS/{href}"
  97. if not os.path.isdir(pathutils.path_to_filesystem(folder, sane_child_path)):
  98. continue
  99. child_path = f"/GROUPS/{href}/"
  100. with child_context_manager(sane_child_path, None):
  101. yield self._collection_class(
  102. cast(multifilesystem.Storage, self), child_path)