__init__.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2012-2017 Guillaume Ayoub
  3. # Copyright © 2017-2023 Unrud <unrud@outlook.com>
  4. # Copyright © 2024-2025 Peter Bieringer <pb@bieringer.de>
  5. #
  6. # This library is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This library is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. Tests for Radicale.
  20. """
  21. import base64
  22. import logging
  23. import shutil
  24. import sys
  25. import tempfile
  26. import wsgiref.util
  27. import xml.etree.ElementTree as ET
  28. from io import BytesIO
  29. from typing import Any, Dict, List, Optional, Tuple, Union
  30. from urllib.parse import quote
  31. import defusedxml.ElementTree as DefusedET
  32. import vobject
  33. import radicale
  34. from radicale import app, config, types, xmlutils
  35. RESPONSES = Dict[str, Union[int, Dict[str, Tuple[int, ET.Element]], vobject.base.Component]]
  36. # Enable debug output
  37. radicale.log.logger.setLevel(logging.DEBUG)
  38. class BaseTest:
  39. """Base class for tests."""
  40. colpath: str
  41. configuration: config.Configuration
  42. application: app.Application
  43. def setup_method(self) -> None:
  44. self.configuration = config.load()
  45. self.colpath = tempfile.mkdtemp()
  46. self.configure({
  47. "storage": {"filesystem_folder": self.colpath,
  48. # Disable syncing to disk for better performance
  49. "_filesystem_fsync": "False"},
  50. # Set incorrect authentication delay to a short duration
  51. "auth": {"delay": "0.001"}})
  52. def configure(self, config_: types.CONFIG) -> None:
  53. self.configuration.update(config_, "test", privileged=True)
  54. self.application = app.Application(self.configuration)
  55. def teardown_method(self) -> None:
  56. shutil.rmtree(self.colpath)
  57. def request(self, method: str, path: str, data: Optional[str] = None,
  58. check: Optional[int] = None, **kwargs
  59. ) -> Tuple[int, Dict[str, str], str]:
  60. """Send a request."""
  61. login = kwargs.pop("login", None)
  62. if login is not None and not isinstance(login, str):
  63. raise TypeError("login argument must be %r, not %r" %
  64. (str, type(login)))
  65. http_if_match = kwargs.pop("http_if_match", None)
  66. if http_if_match is not None and not isinstance(http_if_match, str):
  67. raise TypeError("http_if_match argument must be %r, not %r" %
  68. (str, type(http_if_match)))
  69. remote_useragent = kwargs.pop("remote_useragent", None)
  70. remote_host = kwargs.pop("remote_host", None)
  71. environ: Dict[str, Any] = {k.upper(): v for k, v in kwargs.items()}
  72. for k, v in environ.items():
  73. if not isinstance(v, str):
  74. raise TypeError("type of %r is %r, expected %r" %
  75. (k, type(v), str))
  76. encoding: str = self.configuration.get("encoding", "request")
  77. if login:
  78. environ["HTTP_AUTHORIZATION"] = "Basic " + base64.b64encode(
  79. login.encode(encoding)).decode()
  80. if http_if_match:
  81. environ["HTTP_IF_MATCH"] = http_if_match
  82. if remote_useragent:
  83. environ["HTTP_USER_AGENT"] = remote_useragent
  84. if remote_host:
  85. environ["REMOTE_ADDR"] = remote_host
  86. environ["REQUEST_METHOD"] = method.upper()
  87. environ["PATH_INFO"] = path
  88. if data is not None:
  89. data_bytes = data.encode(encoding)
  90. environ["wsgi.input"] = BytesIO(data_bytes)
  91. environ["CONTENT_LENGTH"] = str(len(data_bytes))
  92. environ["wsgi.errors"] = sys.stderr
  93. wsgiref.util.setup_testing_defaults(environ)
  94. status = headers = None
  95. def start_response(status_: str, headers_: List[Tuple[str, str]]
  96. ) -> None:
  97. nonlocal status, headers
  98. status = int(status_.split()[0])
  99. headers = dict(headers_)
  100. answers = list(self.application(environ, start_response))
  101. assert status is not None and headers is not None
  102. assert check is None or status == check, "%d != %d" % (status, check)
  103. return status, headers, answers[0].decode() if answers else ""
  104. @staticmethod
  105. def parse_responses(text: str) -> RESPONSES:
  106. xml = DefusedET.fromstring(text)
  107. assert xml.tag == xmlutils.make_clark("D:multistatus")
  108. path_responses: RESPONSES = {}
  109. for response in xml.findall(xmlutils.make_clark("D:response")):
  110. href = response.find(xmlutils.make_clark("D:href"))
  111. assert href.text not in path_responses
  112. prop_responses: Dict[str, Tuple[int, ET.Element]] = {}
  113. for propstat in response.findall(
  114. xmlutils.make_clark("D:propstat")):
  115. status = propstat.find(xmlutils.make_clark("D:status"))
  116. assert status.text.startswith("HTTP/1.1 ")
  117. status_code = int(status.text.split(" ")[1])
  118. for element in propstat.findall(
  119. "./%s/*" % xmlutils.make_clark("D:prop")):
  120. human_tag = xmlutils.make_human_tag(element.tag)
  121. assert human_tag not in prop_responses
  122. prop_responses[human_tag] = (status_code, element)
  123. status = response.find(xmlutils.make_clark("D:status"))
  124. if status is not None:
  125. assert not prop_responses
  126. assert status.text.startswith("HTTP/1.1 ")
  127. status_code = int(status.text.split(" ")[1])
  128. path_responses[href.text] = status_code
  129. else:
  130. path_responses[href.text] = prop_responses
  131. return path_responses
  132. @staticmethod
  133. def parse_free_busy(text: str) -> RESPONSES:
  134. path_responses: RESPONSES = {}
  135. path_responses[""] = vobject.readOne(text)
  136. return path_responses
  137. def get(self, path: str, check: Optional[int] = 200, **kwargs
  138. ) -> Tuple[int, str]:
  139. assert "data" not in kwargs
  140. status, _, answer = self.request("GET", path, check=check, **kwargs)
  141. return status, answer
  142. def post(self, path: str, data: Optional[str] = None,
  143. check: Optional[int] = 200, **kwargs) -> Tuple[int, str]:
  144. status, _, answer = self.request("POST", path, data, check=check,
  145. **kwargs)
  146. return status, answer
  147. def put(self, path: str, data: str, check: Optional[int] = 201,
  148. **kwargs) -> Tuple[int, str]:
  149. status, _, answer = self.request("PUT", path, data, check=check,
  150. **kwargs)
  151. return status, answer
  152. def propfind(self, path: str, data: Optional[str] = None,
  153. check: Optional[int] = 207, **kwargs
  154. ) -> Tuple[int, RESPONSES]:
  155. status, _, answer = self.request("PROPFIND", path, data, check=check,
  156. **kwargs)
  157. if status < 200 or 300 <= status:
  158. return status, {}
  159. assert answer is not None
  160. responses = self.parse_responses(answer)
  161. if kwargs.get("HTTP_DEPTH", "0") == "0":
  162. assert len(responses) == 1 and quote(path) in responses
  163. return status, responses
  164. def proppatch(self, path: str, data: Optional[str] = None,
  165. check: Optional[int] = 207, **kwargs
  166. ) -> Tuple[int, RESPONSES]:
  167. status, _, answer = self.request("PROPPATCH", path, data, check=check,
  168. **kwargs)
  169. if status < 200 or 300 <= status:
  170. return status, {}
  171. assert answer is not None
  172. responses = self.parse_responses(answer)
  173. assert len(responses) == 1 and path in responses
  174. return status, responses
  175. def report(self, path: str, data: str, check: Optional[int] = 207,
  176. is_xml: Optional[bool] = True,
  177. **kwargs) -> Tuple[int, RESPONSES]:
  178. status, _, answer = self.request("REPORT", path, data, check=check,
  179. **kwargs)
  180. if status < 200 or 300 <= status:
  181. return status, {}
  182. assert answer is not None
  183. if is_xml:
  184. parsed = self.parse_responses(answer)
  185. else:
  186. parsed = self.parse_free_busy(answer)
  187. return status, parsed
  188. def delete(self, path: str, check: Optional[int] = 200, **kwargs
  189. ) -> Tuple[int, RESPONSES]:
  190. assert "data" not in kwargs
  191. status, _, answer = self.request("DELETE", path, check=check, **kwargs)
  192. if status < 200 or 300 <= status:
  193. return status, {}
  194. assert answer is not None
  195. responses = self.parse_responses(answer)
  196. assert len(responses) == 1 and path in responses
  197. return status, responses
  198. def mkcalendar(self, path: str, data: Optional[str] = None,
  199. check: Optional[int] = 201, **kwargs
  200. ) -> Tuple[int, str]:
  201. status, _, answer = self.request("MKCALENDAR", path, data, check=check,
  202. **kwargs)
  203. return status, answer
  204. def mkcol(self, path: str, data: Optional[str] = None,
  205. check: Optional[int] = 201, **kwargs) -> int:
  206. status, _, _ = self.request("MKCOL", path, data, check=check, **kwargs)
  207. return status
  208. def create_addressbook(self, path: str, check: Optional[int] = 201,
  209. **kwargs) -> int:
  210. assert "data" not in kwargs
  211. return self.mkcol(path, """\
  212. <?xml version="1.0" encoding="UTF-8" ?>
  213. <create xmlns="DAV:" xmlns:CR="urn:ietf:params:xml:ns:carddav">
  214. <set>
  215. <prop>
  216. <resourcetype>
  217. <collection />
  218. <CR:addressbook />
  219. </resourcetype>
  220. </prop>
  221. </set>
  222. </create>""", check=check, **kwargs)