__init__.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2012-2017 Guillaume Ayoub
  3. # Copyright © 2017-2018 Unrud <unrud@outlook.com>
  4. #
  5. # This library is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This library is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  17. """
  18. Tests for Radicale.
  19. """
  20. import base64
  21. import logging
  22. import shutil
  23. import sys
  24. import tempfile
  25. import wsgiref.util
  26. import xml.etree.ElementTree as ET
  27. from io import BytesIO
  28. from typing import Any, Dict, List, Optional, Tuple, Union
  29. import defusedxml.ElementTree as DefusedET
  30. import radicale
  31. from radicale import app, config, types, xmlutils
  32. RESPONSES = Dict[str, Union[int, Dict[str, Tuple[int, ET.Element]]]]
  33. # Enable debug output
  34. radicale.log.logger.setLevel(logging.DEBUG)
  35. class BaseTest:
  36. """Base class for tests."""
  37. colpath: str
  38. configuration: config.Configuration
  39. application: app.Application
  40. def setup_method(self) -> None:
  41. self.configuration = config.load()
  42. self.colpath = tempfile.mkdtemp()
  43. self.configure({
  44. "storage": {"filesystem_folder": self.colpath,
  45. # Disable syncing to disk for better performance
  46. "_filesystem_fsync": "False"},
  47. # Set incorrect authentication delay to a short duration
  48. "auth": {"delay": "0.001"}})
  49. def configure(self, config_: types.CONFIG) -> None:
  50. self.configuration.update(config_, "test", privileged=True)
  51. self.application = app.Application(self.configuration)
  52. def teardown_method(self) -> None:
  53. shutil.rmtree(self.colpath)
  54. def request(self, method: str, path: str, data: Optional[str] = None,
  55. check: Optional[int] = None, **kwargs
  56. ) -> Tuple[int, Dict[str, str], str]:
  57. """Send a request."""
  58. login = kwargs.pop("login", None)
  59. if login is not None and not isinstance(login, str):
  60. raise TypeError("login argument must be %r, not %r" %
  61. (str, type(login)))
  62. environ: Dict[str, Any] = {k.upper(): v for k, v in kwargs.items()}
  63. for k, v in environ.items():
  64. if not isinstance(v, str):
  65. raise TypeError("type of %r is %r, expected %r" %
  66. (k, type(v), str))
  67. encoding: str = self.configuration.get("encoding", "request")
  68. if login:
  69. environ["HTTP_AUTHORIZATION"] = "Basic " + base64.b64encode(
  70. login.encode(encoding)).decode()
  71. environ["REQUEST_METHOD"] = method.upper()
  72. environ["PATH_INFO"] = path
  73. if data is not None:
  74. data_bytes = data.encode(encoding)
  75. environ["wsgi.input"] = BytesIO(data_bytes)
  76. environ["CONTENT_LENGTH"] = str(len(data_bytes))
  77. environ["wsgi.errors"] = sys.stderr
  78. wsgiref.util.setup_testing_defaults(environ)
  79. status = headers = None
  80. def start_response(status_: str, headers_: List[Tuple[str, str]]
  81. ) -> None:
  82. nonlocal status, headers
  83. status = int(status_.split()[0])
  84. headers = dict(headers_)
  85. answers = list(self.application(environ, start_response))
  86. assert status is not None and headers is not None
  87. assert check is None or status == check, "%d != %d" % (status, check)
  88. return status, headers, answers[0].decode() if answers else ""
  89. @staticmethod
  90. def parse_responses(text: str) -> RESPONSES:
  91. xml = DefusedET.fromstring(text)
  92. assert xml.tag == xmlutils.make_clark("D:multistatus")
  93. path_responses: Dict[str, Union[
  94. int, Dict[str, Tuple[int, ET.Element]]]] = {}
  95. for response in xml.findall(xmlutils.make_clark("D:response")):
  96. href = response.find(xmlutils.make_clark("D:href"))
  97. assert href.text not in path_responses
  98. prop_respones: Dict[str, Tuple[int, ET.Element]] = {}
  99. for propstat in response.findall(
  100. xmlutils.make_clark("D:propstat")):
  101. status = propstat.find(xmlutils.make_clark("D:status"))
  102. assert status.text.startswith("HTTP/1.1 ")
  103. status_code = int(status.text.split(" ")[1])
  104. for element in propstat.findall(
  105. "./%s/*" % xmlutils.make_clark("D:prop")):
  106. human_tag = xmlutils.make_human_tag(element.tag)
  107. assert human_tag not in prop_respones
  108. prop_respones[human_tag] = (status_code, element)
  109. status = response.find(xmlutils.make_clark("D:status"))
  110. if status is not None:
  111. assert not prop_respones
  112. assert status.text.startswith("HTTP/1.1 ")
  113. status_code = int(status.text.split(" ")[1])
  114. path_responses[href.text] = status_code
  115. else:
  116. path_responses[href.text] = prop_respones
  117. return path_responses
  118. def get(self, path: str, check: Optional[int] = 200, **kwargs
  119. ) -> Tuple[int, str]:
  120. assert "data" not in kwargs
  121. status, _, answer = self.request("GET", path, check=check, **kwargs)
  122. return status, answer
  123. def post(self, path: str, data: Optional[str] = None,
  124. check: Optional[int] = 200, **kwargs) -> Tuple[int, str]:
  125. status, _, answer = self.request("POST", path, data, check=check,
  126. **kwargs)
  127. return status, answer
  128. def put(self, path: str, data: str, check: Optional[int] = 201,
  129. **kwargs) -> Tuple[int, str]:
  130. status, _, answer = self.request("PUT", path, data, check=check,
  131. **kwargs)
  132. return status, answer
  133. def propfind(self, path: str, data: Optional[str] = None,
  134. check: Optional[int] = 207, **kwargs
  135. ) -> Tuple[int, RESPONSES]:
  136. status, _, answer = self.request("PROPFIND", path, data, check=check,
  137. **kwargs)
  138. if status < 200 or 300 <= status:
  139. return status, {}
  140. assert answer is not None
  141. responses = self.parse_responses(answer)
  142. if kwargs.get("HTTP_DEPTH", "0") == "0":
  143. assert len(responses) == 1 and path in responses
  144. return status, responses
  145. def proppatch(self, path: str, data: Optional[str] = None,
  146. check: Optional[int] = 207, **kwargs
  147. ) -> Tuple[int, RESPONSES]:
  148. status, _, answer = self.request("PROPPATCH", path, data, check=check,
  149. **kwargs)
  150. if status < 200 or 300 <= status:
  151. return status, {}
  152. assert answer is not None
  153. responses = self.parse_responses(answer)
  154. assert len(responses) == 1 and path in responses
  155. return status, responses
  156. def report(self, path: str, data: str, check: Optional[int] = 207,
  157. **kwargs) -> Tuple[int, RESPONSES]:
  158. status, _, answer = self.request("REPORT", path, data, check=check,
  159. **kwargs)
  160. if status < 200 or 300 <= status:
  161. return status, {}
  162. assert answer is not None
  163. return status, self.parse_responses(answer)
  164. def delete(self, path: str, check: Optional[int] = 200, **kwargs
  165. ) -> Tuple[int, RESPONSES]:
  166. assert "data" not in kwargs
  167. status, _, answer = self.request("DELETE", path, check=check, **kwargs)
  168. if status < 200 or 300 <= status:
  169. return status, {}
  170. assert answer is not None
  171. responses = self.parse_responses(answer)
  172. assert len(responses) == 1 and path in responses
  173. return status, responses
  174. def mkcalendar(self, path: str, data: Optional[str] = None,
  175. check: Optional[int] = 201, **kwargs
  176. ) -> Tuple[int, str]:
  177. status, _, answer = self.request("MKCALENDAR", path, data, check=check,
  178. **kwargs)
  179. return status, answer
  180. def mkcol(self, path: str, data: Optional[str] = None,
  181. check: Optional[int] = 201, **kwargs) -> int:
  182. status, _, _ = self.request("MKCOL", path, data, check=check, **kwargs)
  183. return status
  184. def create_addressbook(self, path: str, check: Optional[int] = 201,
  185. **kwargs) -> int:
  186. assert "data" not in kwargs
  187. return self.mkcol(path, """\
  188. <?xml version="1.0" encoding="UTF-8" ?>
  189. <create xmlns="DAV:" xmlns:CR="urn:ietf:params:xml:ns:carddav">
  190. <set>
  191. <prop>
  192. <resourcetype>
  193. <collection />
  194. <CR:addressbook />
  195. </resourcetype>
  196. </prop>
  197. </set>
  198. </create>""", check=check, **kwargs)