base.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2020 Unrud <unrud@outlook.com>
  3. # Copyright © 2024-2024 Peter Bieringer <pb@bieringer.de>
  4. #
  5. # This library is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This library is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  17. import io
  18. import logging
  19. import posixpath
  20. import sys
  21. import xml.etree.ElementTree as ET
  22. from typing import Optional
  23. from radicale import (auth, config, hook, httputils, pathutils, rights,
  24. storage, types, utils, web, xmlutils)
  25. from radicale.log import logger
  26. # HACK: https://github.com/tiran/defusedxml/issues/54
  27. import defusedxml.ElementTree as DefusedET # isort:skip
  28. sys.modules["xml.etree"].ElementTree = ET # type:ignore[attr-defined]
  29. class ApplicationBase:
  30. configuration: config.Configuration
  31. _auth: auth.BaseAuth
  32. _storage: storage.BaseStorage
  33. _rights: rights.BaseRights
  34. _web: web.BaseWeb
  35. _encoding: str
  36. _max_resource_size: int
  37. _permit_delete_collection: bool
  38. _permit_overwrite_collection: bool
  39. _strict_preconditions: bool
  40. _hook: hook.BaseHook
  41. def __init__(self, configuration: config.Configuration) -> None:
  42. self.configuration = configuration
  43. self._auth = auth.load(configuration)
  44. self._storage = storage.load(configuration)
  45. self._rights = rights.load(configuration)
  46. self._web = web.load(configuration)
  47. self._encoding = configuration.get("encoding", "request")
  48. self._log_bad_put_request_content = configuration.get("logging", "bad_put_request_content")
  49. self._response_content_on_debug = configuration.get("logging", "response_content_on_debug")
  50. self._request_content_on_debug = configuration.get("logging", "request_content_on_debug")
  51. self._hook = hook.load(configuration)
  52. def _read_xml_request_body(self, environ: types.WSGIEnviron
  53. ) -> Optional[ET.Element]:
  54. content = httputils.decode_request(
  55. self.configuration, environ,
  56. httputils.read_raw_request_body(self.configuration, environ))
  57. if not content:
  58. return None
  59. try:
  60. xml_content = DefusedET.fromstring(content)
  61. except ET.ParseError as e:
  62. logger.debug("Request content (Invalid XML):\n%s", content)
  63. raise RuntimeError("Failed to parse XML: %s" % e) from e
  64. if logger.isEnabledFor(logging.DEBUG):
  65. if self._request_content_on_debug:
  66. logger.debug("Request content (XML):\n%s",
  67. utils.textwrap_str(xmlutils.pretty_xml(xml_content)))
  68. else:
  69. logger.debug("Request content (XML): suppressed by config/option [logging] request_content_on_debug")
  70. return xml_content
  71. def _xml_response(self, xml_content: ET.Element) -> bytes:
  72. if logger.isEnabledFor(logging.DEBUG):
  73. if self._response_content_on_debug:
  74. logger.debug("Response content (XML):\n%s",
  75. utils.textwrap_str(xmlutils.pretty_xml(xml_content)))
  76. else:
  77. logger.debug("Response content (XML): suppressed by config/option [logging] response_content_on_debug")
  78. f = io.BytesIO()
  79. ET.ElementTree(xml_content).write(f, encoding=self._encoding,
  80. xml_declaration=True)
  81. return f.getvalue()
  82. def _webdav_error_response(self, status: int, human_tag: str
  83. ) -> types.WSGIResponse:
  84. """Generate XML error response."""
  85. headers = {"Content-Type": "text/xml; charset=%s" % self._encoding}
  86. content = self._xml_response(xmlutils.webdav_error(human_tag))
  87. return status, headers, content, None
  88. class Access:
  89. """Helper class to check access rights of an item"""
  90. user: str
  91. path: str
  92. parent_path: str
  93. permissions: str
  94. _rights: rights.BaseRights
  95. _parent_permissions: Optional[str]
  96. def __init__(self, rights: rights.BaseRights, user: str, path: str
  97. ) -> None:
  98. self._rights = rights
  99. self.user = user
  100. self.path = path
  101. self.parent_path = pathutils.unstrip_path(
  102. posixpath.dirname(pathutils.strip_path(path)), True)
  103. self.permissions = self._rights.authorization(self.user, self.path)
  104. self._parent_permissions = None
  105. @property
  106. def parent_permissions(self) -> str:
  107. if self.path == self.parent_path:
  108. return self.permissions
  109. if self._parent_permissions is None:
  110. self._parent_permissions = self._rights.authorization(
  111. self.user, self.parent_path)
  112. return self._parent_permissions
  113. def check(self, permission: str,
  114. item: Optional[types.CollectionOrItem] = None) -> bool:
  115. if permission not in "rwdDoO":
  116. raise ValueError("Invalid permission argument: %r" % permission)
  117. if not item:
  118. permissions = permission + permission.upper()
  119. parent_permissions = permission
  120. elif isinstance(item, storage.BaseCollection):
  121. if item.tag:
  122. permissions = permission
  123. else:
  124. permissions = permission.upper()
  125. parent_permissions = ""
  126. else:
  127. permissions = ""
  128. parent_permissions = permission
  129. return bool(rights.intersect(self.permissions, permissions) or (
  130. self.path != self.parent_path and
  131. rights.intersect(self.parent_permissions, parent_permissions)))