__init__.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2012-2017 Guillaume Ayoub
  3. # Copyright © 2017-2023 Unrud <unrud@outlook.com>
  4. # Copyright © 2024-2026 Peter Bieringer <pb@bieringer.de>
  5. #
  6. # This library is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This library is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. Tests for Radicale.
  20. """
  21. import base64
  22. import logging
  23. import os
  24. import platform
  25. import shutil
  26. import sys
  27. import tempfile
  28. import wsgiref.util
  29. import xml.etree.ElementTree as ET
  30. from io import BytesIO
  31. from typing import Any, Dict, List, Optional, Tuple, Union
  32. from urllib.parse import quote
  33. import defusedxml.ElementTree as DefusedET
  34. import vobject
  35. import radicale
  36. from radicale import app, config, types, utils, xmlutils
  37. RESPONSES = Dict[str, Union[int, Dict[str, Tuple[int, ET.Element]], vobject.base.Component]]
  38. # Enable debug output
  39. radicale.log.logger.setLevel(logging.DEBUG)
  40. class BaseTest:
  41. """Base class for tests."""
  42. colpath: str
  43. configuration: config.Configuration
  44. application: app.Application
  45. def setup_method(self) -> None:
  46. if os.environ.get("PYTHONPATH"):
  47. info = "with PYTHONPATH=%r " % os.environ.get("PYTHONPATH")
  48. else:
  49. info = ""
  50. logging.info("Testing Radicale %s(%s) as %s on %s", info, utils.packages_version(), utils.user_groups_as_string(), platform.platform())
  51. self.configuration = config.load()
  52. self.colpath = tempfile.mkdtemp()
  53. self.configure({
  54. "storage": {"filesystem_folder": self.colpath,
  55. # Disable syncing to disk for better performance
  56. "_filesystem_fsync": "False"},
  57. # Set incorrect authentication delay to a short duration
  58. "auth": {"delay": "0.001"}})
  59. def configure(self, config_: types.CONFIG) -> None:
  60. self.configuration.update(config_, "test", privileged=True)
  61. self.application = app.Application(self.configuration)
  62. def teardown_method(self) -> None:
  63. shutil.rmtree(self.colpath)
  64. def request(self, method: str, path: str, data: Optional[str] = None,
  65. check: Optional[int] = None, **kwargs
  66. ) -> Tuple[int, Dict[str, str], str]:
  67. """Send a request."""
  68. login = kwargs.pop("login", None)
  69. if login is not None and not isinstance(login, str):
  70. raise TypeError("login argument must be %r, not %r" %
  71. (str, type(login)))
  72. http_if_match = kwargs.pop("http_if_match", None)
  73. if http_if_match is not None and not isinstance(http_if_match, str):
  74. raise TypeError("http_if_match argument must be %r, not %r" %
  75. (str, type(http_if_match)))
  76. remote_useragent = kwargs.pop("remote_useragent", None)
  77. remote_host = kwargs.pop("remote_host", None)
  78. environ: Dict[str, Any] = {k.upper(): v for k, v in kwargs.items()}
  79. for k, v in environ.items():
  80. if not isinstance(v, str):
  81. raise TypeError("type of %r is %r, expected %r" %
  82. (k, type(v), str))
  83. encoding: str = self.configuration.get("encoding", "request")
  84. if login:
  85. environ["HTTP_AUTHORIZATION"] = "Basic " + base64.b64encode(
  86. login.encode(encoding)).decode()
  87. if http_if_match:
  88. environ["HTTP_IF_MATCH"] = http_if_match
  89. if remote_useragent:
  90. environ["HTTP_USER_AGENT"] = remote_useragent
  91. if remote_host:
  92. environ["REMOTE_ADDR"] = remote_host
  93. environ["REQUEST_METHOD"] = method.upper()
  94. environ["PATH_INFO"] = path
  95. if data is not None:
  96. data_bytes = data.encode(encoding)
  97. environ["wsgi.input"] = BytesIO(data_bytes)
  98. environ["CONTENT_LENGTH"] = str(len(data_bytes))
  99. environ["wsgi.errors"] = sys.stderr
  100. wsgiref.util.setup_testing_defaults(environ)
  101. status = headers = None
  102. def start_response(status_: str, headers_: List[Tuple[str, str]]
  103. ) -> None:
  104. nonlocal status, headers
  105. status = int(status_.split()[0])
  106. headers = dict(headers_)
  107. answers = list(self.application(environ, start_response))
  108. assert status is not None and headers is not None
  109. assert check is None or status == check, "%d != %d" % (status, check)
  110. return status, headers, answers[0].decode() if answers else ""
  111. @staticmethod
  112. def parse_responses(text: str) -> RESPONSES:
  113. xml = DefusedET.fromstring(text)
  114. assert xml.tag == xmlutils.make_clark("D:multistatus")
  115. path_responses: RESPONSES = {}
  116. for response in xml.findall(xmlutils.make_clark("D:response")):
  117. href = response.find(xmlutils.make_clark("D:href"))
  118. assert href.text not in path_responses
  119. prop_responses: Dict[str, Tuple[int, ET.Element]] = {}
  120. for propstat in response.findall(
  121. xmlutils.make_clark("D:propstat")):
  122. status = propstat.find(xmlutils.make_clark("D:status"))
  123. assert status.text.startswith("HTTP/1.1 ")
  124. status_code = int(status.text.split(" ")[1])
  125. for element in propstat.findall(
  126. "./%s/*" % xmlutils.make_clark("D:prop")):
  127. human_tag = xmlutils.make_human_tag(element.tag)
  128. assert human_tag not in prop_responses
  129. prop_responses[human_tag] = (status_code, element)
  130. status = response.find(xmlutils.make_clark("D:status"))
  131. if status is not None:
  132. assert not prop_responses
  133. assert status.text.startswith("HTTP/1.1 ")
  134. status_code = int(status.text.split(" ")[1])
  135. path_responses[href.text] = status_code
  136. else:
  137. path_responses[href.text] = prop_responses
  138. return path_responses
  139. @staticmethod
  140. def parse_free_busy(text: str) -> RESPONSES:
  141. path_responses: RESPONSES = {}
  142. path_responses[""] = vobject.readOne(text)
  143. return path_responses
  144. def get(self, path: str, check: Optional[int] = 200, **kwargs
  145. ) -> Tuple[int, str]:
  146. assert "data" not in kwargs
  147. status, _, answer = self.request("GET", path, check=check, **kwargs)
  148. return status, answer
  149. def post(self, path: str, data: Optional[str] = None,
  150. check: Optional[int] = 200, **kwargs) -> Tuple[int, str]:
  151. status, _, answer = self.request("POST", path, data, check=check,
  152. **kwargs)
  153. return status, answer
  154. def put(self, path: str, data: str, check: Optional[int] = 201,
  155. **kwargs) -> Tuple[int, str]:
  156. status, _, answer = self.request("PUT", path, data, check=check,
  157. **kwargs)
  158. return status, answer
  159. def propfind(self, path: str, data: Optional[str] = None,
  160. check: Optional[int] = 207, **kwargs
  161. ) -> Tuple[int, RESPONSES]:
  162. status, _, answer = self.request("PROPFIND", path, data, check=check,
  163. **kwargs)
  164. if status < 200 or 300 <= status:
  165. return status, {}
  166. assert answer is not None
  167. responses = self.parse_responses(answer)
  168. if kwargs.get("HTTP_DEPTH", "0") == "0":
  169. assert len(responses) == 1 and quote(path) in responses
  170. return status, responses
  171. def proppatch(self, path: str, data: Optional[str] = None,
  172. check: Optional[int] = 207, **kwargs
  173. ) -> Tuple[int, RESPONSES]:
  174. status, _, answer = self.request("PROPPATCH", path, data, check=check,
  175. **kwargs)
  176. if status < 200 or 300 <= status:
  177. return status, {}
  178. assert answer is not None
  179. responses = self.parse_responses(answer)
  180. assert len(responses) == 1 and path in responses
  181. return status, responses
  182. def report(self, path: str, data: str, check: Optional[int] = 207,
  183. is_xml: Optional[bool] = True,
  184. **kwargs) -> Tuple[int, RESPONSES]:
  185. status, _, answer = self.request("REPORT", path, data, check=check,
  186. **kwargs)
  187. if status < 200 or 300 <= status:
  188. return status, {}
  189. assert answer is not None
  190. if is_xml:
  191. parsed = self.parse_responses(answer)
  192. else:
  193. parsed = self.parse_free_busy(answer)
  194. return status, parsed
  195. def delete(self, path: str, check: Optional[int] = 200, **kwargs
  196. ) -> Tuple[int, RESPONSES]:
  197. assert "data" not in kwargs
  198. status, _, answer = self.request("DELETE", path, check=check, **kwargs)
  199. if status < 200 or 300 <= status:
  200. return status, {}
  201. assert answer is not None
  202. responses = self.parse_responses(answer)
  203. assert len(responses) == 1 and path in responses
  204. return status, responses
  205. def mkcalendar(self, path: str, data: Optional[str] = None,
  206. check: Optional[int] = 201, **kwargs
  207. ) -> Tuple[int, str]:
  208. status, _, answer = self.request("MKCALENDAR", path, data, check=check,
  209. **kwargs)
  210. return status, answer
  211. def mkcol(self, path: str, data: Optional[str] = None,
  212. check: Optional[int] = 201, **kwargs) -> int:
  213. status, _, _ = self.request("MKCOL", path, data, check=check, **kwargs)
  214. return status
  215. def create_addressbook(self, path: str, check: Optional[int] = 201,
  216. **kwargs) -> int:
  217. assert "data" not in kwargs
  218. return self.mkcol(path, """\
  219. <?xml version="1.0" encoding="UTF-8" ?>
  220. <create xmlns="DAV:" xmlns:CR="urn:ietf:params:xml:ns:carddav">
  221. <set>
  222. <prop>
  223. <resourcetype>
  224. <collection />
  225. <CR:addressbook />
  226. </resourcetype>
  227. </prop>
  228. </set>
  229. </create>""", check=check, **kwargs)