__init__.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2012-2017 Guillaume Ayoub
  3. # Copyright © 2017-2023 Unrud <unrud@outlook.com>
  4. # Copyright © 2024-2025 Peter Bieringer <pb@bieringer.de>
  5. #
  6. # This library is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This library is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. Tests for Radicale.
  20. """
  21. import base64
  22. import logging
  23. import shutil
  24. import sys
  25. import tempfile
  26. import wsgiref.util
  27. import xml.etree.ElementTree as ET
  28. from io import BytesIO
  29. from typing import Any, Dict, List, Optional, Tuple, Union
  30. from urllib.parse import quote
  31. import defusedxml.ElementTree as DefusedET
  32. import vobject
  33. import radicale
  34. from radicale import app, config, types, xmlutils
  35. RESPONSES = Dict[str, Union[int, Dict[str, Tuple[int, ET.Element]], vobject.base.Component]]
  36. # Enable debug output
  37. radicale.log.logger.setLevel(logging.DEBUG)
  38. class BaseTest:
  39. """Base class for tests."""
  40. colpath: str
  41. configuration: config.Configuration
  42. application: app.Application
  43. def setup_method(self) -> None:
  44. self.configuration = config.load()
  45. self.colpath = tempfile.mkdtemp()
  46. self.configure({
  47. "storage": {"filesystem_folder": self.colpath,
  48. # Disable syncing to disk for better performance
  49. "_filesystem_fsync": "False"},
  50. # Set incorrect authentication delay to a short duration
  51. "auth": {"delay": "0.001"}})
  52. def configure(self, config_: types.CONFIG) -> None:
  53. self.configuration.update(config_, "test", privileged=True)
  54. self.application = app.Application(self.configuration)
  55. def teardown_method(self) -> None:
  56. shutil.rmtree(self.colpath)
  57. def request(self, method: str, path: str, data: Optional[str] = None,
  58. check: Optional[int] = None, **kwargs
  59. ) -> Tuple[int, Dict[str, str], str]:
  60. """Send a request."""
  61. login = kwargs.pop("login", None)
  62. if login is not None and not isinstance(login, str):
  63. raise TypeError("login argument must be %r, not %r" %
  64. (str, type(login)))
  65. http_if_match = kwargs.pop("http_if_match", None)
  66. if http_if_match is not None and not isinstance(http_if_match, str):
  67. raise TypeError("http_if_match argument must be %r, not %r" %
  68. (str, type(http_if_match)))
  69. environ: Dict[str, Any] = {k.upper(): v for k, v in kwargs.items()}
  70. for k, v in environ.items():
  71. if not isinstance(v, str):
  72. raise TypeError("type of %r is %r, expected %r" %
  73. (k, type(v), str))
  74. encoding: str = self.configuration.get("encoding", "request")
  75. if login:
  76. environ["HTTP_AUTHORIZATION"] = "Basic " + base64.b64encode(
  77. login.encode(encoding)).decode()
  78. if http_if_match:
  79. environ["HTTP_IF_MATCH"] = http_if_match
  80. environ["REQUEST_METHOD"] = method.upper()
  81. environ["PATH_INFO"] = path
  82. if data is not None:
  83. data_bytes = data.encode(encoding)
  84. environ["wsgi.input"] = BytesIO(data_bytes)
  85. environ["CONTENT_LENGTH"] = str(len(data_bytes))
  86. environ["wsgi.errors"] = sys.stderr
  87. wsgiref.util.setup_testing_defaults(environ)
  88. status = headers = None
  89. def start_response(status_: str, headers_: List[Tuple[str, str]]
  90. ) -> None:
  91. nonlocal status, headers
  92. status = int(status_.split()[0])
  93. headers = dict(headers_)
  94. answers = list(self.application(environ, start_response))
  95. assert status is not None and headers is not None
  96. assert check is None or status == check, "%d != %d" % (status, check)
  97. return status, headers, answers[0].decode() if answers else ""
  98. @staticmethod
  99. def parse_responses(text: str) -> RESPONSES:
  100. xml = DefusedET.fromstring(text)
  101. assert xml.tag == xmlutils.make_clark("D:multistatus")
  102. path_responses: RESPONSES = {}
  103. for response in xml.findall(xmlutils.make_clark("D:response")):
  104. href = response.find(xmlutils.make_clark("D:href"))
  105. assert href.text not in path_responses
  106. prop_responses: Dict[str, Tuple[int, ET.Element]] = {}
  107. for propstat in response.findall(
  108. xmlutils.make_clark("D:propstat")):
  109. status = propstat.find(xmlutils.make_clark("D:status"))
  110. assert status.text.startswith("HTTP/1.1 ")
  111. status_code = int(status.text.split(" ")[1])
  112. for element in propstat.findall(
  113. "./%s/*" % xmlutils.make_clark("D:prop")):
  114. human_tag = xmlutils.make_human_tag(element.tag)
  115. assert human_tag not in prop_responses
  116. prop_responses[human_tag] = (status_code, element)
  117. status = response.find(xmlutils.make_clark("D:status"))
  118. if status is not None:
  119. assert not prop_responses
  120. assert status.text.startswith("HTTP/1.1 ")
  121. status_code = int(status.text.split(" ")[1])
  122. path_responses[href.text] = status_code
  123. else:
  124. path_responses[href.text] = prop_responses
  125. return path_responses
  126. @staticmethod
  127. def parse_free_busy(text: str) -> RESPONSES:
  128. path_responses: RESPONSES = {}
  129. path_responses[""] = vobject.readOne(text)
  130. return path_responses
  131. def get(self, path: str, check: Optional[int] = 200, **kwargs
  132. ) -> Tuple[int, str]:
  133. assert "data" not in kwargs
  134. status, _, answer = self.request("GET", path, check=check, **kwargs)
  135. return status, answer
  136. def post(self, path: str, data: Optional[str] = None,
  137. check: Optional[int] = 200, **kwargs) -> Tuple[int, str]:
  138. status, _, answer = self.request("POST", path, data, check=check,
  139. **kwargs)
  140. return status, answer
  141. def put(self, path: str, data: str, check: Optional[int] = 201,
  142. **kwargs) -> Tuple[int, str]:
  143. status, _, answer = self.request("PUT", path, data, check=check,
  144. **kwargs)
  145. return status, answer
  146. def propfind(self, path: str, data: Optional[str] = None,
  147. check: Optional[int] = 207, **kwargs
  148. ) -> Tuple[int, RESPONSES]:
  149. status, _, answer = self.request("PROPFIND", path, data, check=check,
  150. **kwargs)
  151. if status < 200 or 300 <= status:
  152. return status, {}
  153. assert answer is not None
  154. responses = self.parse_responses(answer)
  155. if kwargs.get("HTTP_DEPTH", "0") == "0":
  156. assert len(responses) == 1 and quote(path) in responses
  157. return status, responses
  158. def proppatch(self, path: str, data: Optional[str] = None,
  159. check: Optional[int] = 207, **kwargs
  160. ) -> Tuple[int, RESPONSES]:
  161. status, _, answer = self.request("PROPPATCH", path, data, check=check,
  162. **kwargs)
  163. if status < 200 or 300 <= status:
  164. return status, {}
  165. assert answer is not None
  166. responses = self.parse_responses(answer)
  167. assert len(responses) == 1 and path in responses
  168. return status, responses
  169. def report(self, path: str, data: str, check: Optional[int] = 207,
  170. is_xml: Optional[bool] = True,
  171. **kwargs) -> Tuple[int, RESPONSES]:
  172. status, _, answer = self.request("REPORT", path, data, check=check,
  173. **kwargs)
  174. if status < 200 or 300 <= status:
  175. return status, {}
  176. assert answer is not None
  177. if is_xml:
  178. parsed = self.parse_responses(answer)
  179. else:
  180. parsed = self.parse_free_busy(answer)
  181. return status, parsed
  182. def delete(self, path: str, check: Optional[int] = 200, **kwargs
  183. ) -> Tuple[int, RESPONSES]:
  184. assert "data" not in kwargs
  185. status, _, answer = self.request("DELETE", path, check=check, **kwargs)
  186. if status < 200 or 300 <= status:
  187. return status, {}
  188. assert answer is not None
  189. responses = self.parse_responses(answer)
  190. assert len(responses) == 1 and path in responses
  191. return status, responses
  192. def mkcalendar(self, path: str, data: Optional[str] = None,
  193. check: Optional[int] = 201, **kwargs
  194. ) -> Tuple[int, str]:
  195. status, _, answer = self.request("MKCALENDAR", path, data, check=check,
  196. **kwargs)
  197. return status, answer
  198. def mkcol(self, path: str, data: Optional[str] = None,
  199. check: Optional[int] = 201, **kwargs) -> int:
  200. status, _, _ = self.request("MKCOL", path, data, check=check, **kwargs)
  201. return status
  202. def create_addressbook(self, path: str, check: Optional[int] = 201,
  203. **kwargs) -> int:
  204. assert "data" not in kwargs
  205. return self.mkcol(path, """\
  206. <?xml version="1.0" encoding="UTF-8" ?>
  207. <create xmlns="DAV:" xmlns:CR="urn:ietf:params:xml:ns:carddav">
  208. <set>
  209. <prop>
  210. <resourcetype>
  211. <collection />
  212. <CR:addressbook />
  213. </resourcetype>
  214. </prop>
  215. </set>
  216. </create>""", check=check, **kwargs)