__init__.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. # This file is part of Radicale Server - Calendar Server
  2. # Copyright © 2012-2017 Guillaume Ayoub
  3. # Copyright © 2017-2018 Unrud <unrud@outlook.com>
  4. #
  5. # This library is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This library is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  17. """
  18. Tests for Radicale.
  19. """
  20. import base64
  21. import logging
  22. import shutil
  23. import sys
  24. import tempfile
  25. import xml.etree.ElementTree as ET
  26. from io import BytesIO
  27. from typing import Any, Dict, List, Optional, Tuple, Union
  28. import defusedxml.ElementTree as DefusedET
  29. import radicale
  30. from radicale import app, config, xmlutils
  31. RESPONSES = Dict[str, Union[int, Dict[str, Tuple[int, ET.Element]]]]
  32. # Enable debug output
  33. radicale.log.logger.setLevel(logging.DEBUG)
  34. class BaseTest:
  35. """Base class for tests."""
  36. colpath: str
  37. configuration: config.Configuration
  38. application: app.Application
  39. def setup(self) -> None:
  40. self.configuration = config.load()
  41. self.colpath = tempfile.mkdtemp()
  42. self.configuration.update({
  43. "storage": {"filesystem_folder": self.colpath,
  44. # Disable syncing to disk for better performance
  45. "_filesystem_fsync": "False"},
  46. # Set incorrect authentication delay to a short duration
  47. "auth": {"delay": "0.001"}}, "test", privileged=True)
  48. self.application = app.Application(self.configuration)
  49. def teardown(self) -> None:
  50. shutil.rmtree(self.colpath)
  51. def request(self, method: str, path: str, data: Optional[str] = None,
  52. **kwargs) -> Tuple[int, Dict[str, str], str]:
  53. """Send a request."""
  54. login = kwargs.pop("login", None)
  55. if login is not None and not isinstance(login, str):
  56. raise TypeError("login argument must be %r, not %r" %
  57. (str, type(login)))
  58. environ: Dict[str, Any] = {k.upper(): v for k, v in kwargs.items()}
  59. for k, v in environ.items():
  60. if not isinstance(v, str):
  61. raise TypeError("type of %r is %r, expected %r" %
  62. (k, type(v), str))
  63. encoding: str = self.configuration.get("encoding", "request")
  64. if login:
  65. environ["HTTP_AUTHORIZATION"] = "Basic " + base64.b64encode(
  66. login.encode(encoding)).decode()
  67. environ["REQUEST_METHOD"] = method.upper()
  68. environ["PATH_INFO"] = path
  69. if data:
  70. data_bytes = data.encode(encoding)
  71. environ["wsgi.input"] = BytesIO(data_bytes)
  72. environ["CONTENT_LENGTH"] = str(len(data_bytes))
  73. environ["wsgi.errors"] = sys.stderr
  74. status = headers = None
  75. def start_response(status_: str, headers_: List[Tuple[str, str]]
  76. ) -> None:
  77. nonlocal status, headers
  78. status = status_
  79. headers = headers_
  80. answers = list(self.application(environ, start_response))
  81. assert status is not None and headers is not None
  82. return (int(status.split()[0]), dict(headers),
  83. answers[0].decode() if answers else "")
  84. @staticmethod
  85. def parse_responses(text: str) -> RESPONSES:
  86. xml = DefusedET.fromstring(text)
  87. assert xml.tag == xmlutils.make_clark("D:multistatus")
  88. path_responses: Dict[str, Union[
  89. int, Dict[str, Tuple[int, ET.Element]]]] = {}
  90. for response in xml.findall(xmlutils.make_clark("D:response")):
  91. href = response.find(xmlutils.make_clark("D:href"))
  92. assert href.text not in path_responses
  93. prop_respones: Dict[str, Tuple[int, ET.Element]] = {}
  94. for propstat in response.findall(
  95. xmlutils.make_clark("D:propstat")):
  96. status = propstat.find(xmlutils.make_clark("D:status"))
  97. assert status.text.startswith("HTTP/1.1 ")
  98. status_code = int(status.text.split(" ")[1])
  99. for element in propstat.findall(
  100. "./%s/*" % xmlutils.make_clark("D:prop")):
  101. human_tag = xmlutils.make_human_tag(element.tag)
  102. assert human_tag not in prop_respones
  103. prop_respones[human_tag] = (status_code, element)
  104. status = response.find(xmlutils.make_clark("D:status"))
  105. if status is not None:
  106. assert not prop_respones
  107. assert status.text.startswith("HTTP/1.1 ")
  108. status_code = int(status.text.split(" ")[1])
  109. path_responses[href.text] = status_code
  110. else:
  111. path_responses[href.text] = prop_respones
  112. return path_responses
  113. @staticmethod
  114. def _check_status(status: int, good_status: int,
  115. check: Union[bool, int] = True) -> bool:
  116. if check is not False:
  117. expected = good_status if check is True else check
  118. assert status == expected, "%d != %d" % (status, expected)
  119. return status == good_status
  120. def get(self, path: str, check: Union[bool, int] = True, **kwargs
  121. ) -> Tuple[int, str]:
  122. assert "data" not in kwargs
  123. status, _, answer = self.request("GET", path, **kwargs)
  124. self._check_status(status, 200, check)
  125. return status, answer
  126. def post(self, path: str, data: str = None, check: Union[bool, int] = True,
  127. **kwargs) -> Tuple[int, str]:
  128. status, _, answer = self.request("POST", path, data, **kwargs)
  129. self._check_status(status, 200, check)
  130. return status, answer
  131. def put(self, path: str, data: str, check: Union[bool, int] = True,
  132. **kwargs) -> Tuple[int, str]:
  133. status, _, answer = self.request("PUT", path, data, **kwargs)
  134. self._check_status(status, 201, check)
  135. return status, answer
  136. def propfind(self, path: str, data: Optional[str] = None,
  137. check: Union[bool, int] = True, **kwargs
  138. ) -> Tuple[int, RESPONSES]:
  139. status, _, answer = self.request("PROPFIND", path, data, **kwargs)
  140. if not self._check_status(status, 207, check):
  141. return status, {}
  142. assert answer is not None
  143. responses = self.parse_responses(answer)
  144. if kwargs.get("HTTP_DEPTH", "0") == "0":
  145. assert len(responses) == 1 and path in responses
  146. return status, responses
  147. def proppatch(self, path: str, data: Optional[str] = None,
  148. check: Union[bool, int] = True, **kwargs
  149. ) -> Tuple[int, RESPONSES]:
  150. status, _, answer = self.request("PROPPATCH", path, data, **kwargs)
  151. if not self._check_status(status, 207, check):
  152. return status, {}
  153. assert answer is not None
  154. responses = self.parse_responses(answer)
  155. assert len(responses) == 1 and path in responses
  156. return status, responses
  157. def report(self, path: str, data: str, check: Union[bool, int] = True,
  158. **kwargs) -> Tuple[int, RESPONSES]:
  159. status, _, answer = self.request("REPORT", path, data, **kwargs)
  160. if not self._check_status(status, 207, check):
  161. return status, {}
  162. assert answer is not None
  163. return status, self.parse_responses(answer)
  164. def delete(self, path: str, check: Union[bool, int] = True, **kwargs
  165. ) -> Tuple[int, RESPONSES]:
  166. assert "data" not in kwargs
  167. status, _, answer = self.request("DELETE", path, **kwargs)
  168. if not self._check_status(status, 200, check):
  169. return status, {}
  170. assert answer is not None
  171. responses = self.parse_responses(answer)
  172. assert len(responses) == 1 and path in responses
  173. return status, responses
  174. def mkcalendar(self, path: str, data: Optional[str] = None,
  175. check: Union[bool, int] = True, **kwargs
  176. ) -> Tuple[int, str]:
  177. status, _, answer = self.request("MKCALENDAR", path, data, **kwargs)
  178. self._check_status(status, 201, check)
  179. return status, answer
  180. def mkcol(self, path: str, data: Optional[str] = None,
  181. check: Union[bool, int] = True, **kwargs) -> int:
  182. status, _, _ = self.request("MKCOL", path, data, **kwargs)
  183. self._check_status(status, 201, check)
  184. return status
  185. def create_addressbook(self, path: str, check: Union[bool, int] = True,
  186. **kwargs) -> int:
  187. assert "data" not in kwargs
  188. return self.mkcol(path, """\
  189. <?xml version="1.0" encoding="UTF-8" ?>
  190. <create xmlns="DAV:" xmlns:CR="urn:ietf:params:xml:ns:carddav">
  191. <set>
  192. <prop>
  193. <resourcetype>
  194. <collection />
  195. <CR:addressbook />
  196. </resourcetype>
  197. </prop>
  198. </set>
  199. </create>""", check=check, **kwargs)