base.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2020 Unrud <unrud@outlook.com>
  3. #
  4. # This library is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This library is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  16. import io
  17. import logging
  18. import posixpath
  19. import sys
  20. import xml.etree.ElementTree as ET
  21. from typing import Optional
  22. from radicale import (auth, config, hook, httputils, pathutils, rights,
  23. storage, types, web, xmlutils)
  24. from radicale.log import logger
  25. # HACK: https://github.com/tiran/defusedxml/issues/54
  26. import defusedxml.ElementTree as DefusedET # isort:skip
  27. sys.modules["xml.etree"].ElementTree = ET # type:ignore[attr-defined]
  28. class ApplicationBase:
  29. configuration: config.Configuration
  30. _auth: auth.BaseAuth
  31. _storage: storage.BaseStorage
  32. _rights: rights.BaseRights
  33. _web: web.BaseWeb
  34. _encoding: str
  35. _hook: hook.BaseHook
  36. def __init__(self, configuration: config.Configuration) -> None:
  37. self.configuration = configuration
  38. self._auth = auth.load(configuration)
  39. self._storage = storage.load(configuration)
  40. self._rights = rights.load(configuration)
  41. self._web = web.load(configuration)
  42. self._encoding = configuration.get("encoding", "request")
  43. self._hook = hook.load(configuration)
  44. def _read_xml_request_body(self, environ: types.WSGIEnviron
  45. ) -> Optional[ET.Element]:
  46. content = httputils.decode_request(
  47. self.configuration, environ,
  48. httputils.read_raw_request_body(self.configuration, environ))
  49. if not content:
  50. return None
  51. try:
  52. xml_content = DefusedET.fromstring(content)
  53. except ET.ParseError as e:
  54. logger.debug("Request content (Invalid XML):\n%s", content)
  55. raise RuntimeError("Failed to parse XML: %s" % e) from e
  56. if logger.isEnabledFor(logging.DEBUG):
  57. logger.debug("Request content:\n%s",
  58. xmlutils.pretty_xml(xml_content))
  59. return xml_content
  60. def _xml_response(self, xml_content: ET.Element) -> bytes:
  61. if logger.isEnabledFor(logging.DEBUG):
  62. logger.debug("Response content:\n%s",
  63. xmlutils.pretty_xml(xml_content))
  64. f = io.BytesIO()
  65. ET.ElementTree(xml_content).write(f, encoding=self._encoding,
  66. xml_declaration=True)
  67. return f.getvalue()
  68. def _webdav_error_response(self, status: int, human_tag: str
  69. ) -> types.WSGIResponse:
  70. """Generate XML error response."""
  71. headers = {"Content-Type": "text/xml; charset=%s" % self._encoding}
  72. content = self._xml_response(xmlutils.webdav_error(human_tag))
  73. return status, headers, content
  74. class Access:
  75. """Helper class to check access rights of an item"""
  76. user: str
  77. path: str
  78. parent_path: str
  79. permissions: str
  80. _rights: rights.BaseRights
  81. _parent_permissions: Optional[str]
  82. def __init__(self, rights: rights.BaseRights, user: str, path: str
  83. ) -> None:
  84. self._rights = rights
  85. self.user = user
  86. self.path = path
  87. self.parent_path = pathutils.unstrip_path(
  88. posixpath.dirname(pathutils.strip_path(path)), True)
  89. self.permissions = self._rights.authorization(self.user, self.path)
  90. self._parent_permissions = None
  91. @property
  92. def parent_permissions(self) -> str:
  93. if self.path == self.parent_path:
  94. return self.permissions
  95. if self._parent_permissions is None:
  96. self._parent_permissions = self._rights.authorization(
  97. self.user, self.parent_path)
  98. return self._parent_permissions
  99. def check(self, permission: str,
  100. item: Optional[types.CollectionOrItem] = None) -> bool:
  101. if permission not in "rw":
  102. raise ValueError("Invalid permission argument: %r" % permission)
  103. if not item:
  104. permissions = permission + permission.upper()
  105. parent_permissions = permission
  106. elif isinstance(item, storage.BaseCollection):
  107. if item.tag:
  108. permissions = permission
  109. else:
  110. permissions = permission.upper()
  111. parent_permissions = ""
  112. else:
  113. permissions = ""
  114. parent_permissions = permission
  115. return bool(rights.intersect(self.permissions, permissions) or (
  116. self.path != self.parent_path and
  117. rights.intersect(self.parent_permissions, parent_permissions)))