__init__.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2012-2017 Guillaume Ayoub
  3. # Copyright © 2017-2018 Unrud <unrud@outlook.com>
  4. #
  5. # This library is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This library is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  17. """
  18. Tests for Radicale.
  19. """
  20. import base64
  21. import logging
  22. import shutil
  23. import sys
  24. import tempfile
  25. import xml.etree.ElementTree as ET
  26. from io import BytesIO
  27. from typing import Any, Dict, List, Optional, Tuple, Union
  28. import defusedxml.ElementTree as DefusedET
  29. import radicale
  30. from radicale import app, config, types, xmlutils
  31. RESPONSES = Dict[str, Union[int, Dict[str, Tuple[int, ET.Element]]]]
  32. # Enable debug output
  33. radicale.log.logger.setLevel(logging.DEBUG)
  34. class BaseTest:
  35. """Base class for tests."""
  36. colpath: str
  37. configuration: config.Configuration
  38. application: app.Application
  39. def setup(self) -> None:
  40. self.configuration = config.load()
  41. self.colpath = tempfile.mkdtemp()
  42. self.configure({
  43. "storage": {"filesystem_folder": self.colpath,
  44. # Disable syncing to disk for better performance
  45. "_filesystem_fsync": "False"},
  46. # Set incorrect authentication delay to a short duration
  47. "auth": {"delay": "0.001"}})
  48. def configure(self, config_: types.CONFIG) -> None:
  49. self.configuration.update(config_, "test", privileged=True)
  50. self.application = app.Application(self.configuration)
  51. def teardown(self) -> None:
  52. shutil.rmtree(self.colpath)
  53. def request(self, method: str, path: str, data: Optional[str] = None,
  54. **kwargs) -> Tuple[int, Dict[str, str], str]:
  55. """Send a request."""
  56. login = kwargs.pop("login", None)
  57. if login is not None and not isinstance(login, str):
  58. raise TypeError("login argument must be %r, not %r" %
  59. (str, type(login)))
  60. environ: Dict[str, Any] = {k.upper(): v for k, v in kwargs.items()}
  61. for k, v in environ.items():
  62. if not isinstance(v, str):
  63. raise TypeError("type of %r is %r, expected %r" %
  64. (k, type(v), str))
  65. encoding: str = self.configuration.get("encoding", "request")
  66. if login:
  67. environ["HTTP_AUTHORIZATION"] = "Basic " + base64.b64encode(
  68. login.encode(encoding)).decode()
  69. environ["REQUEST_METHOD"] = method.upper()
  70. environ["PATH_INFO"] = path
  71. if data:
  72. data_bytes = data.encode(encoding)
  73. environ["wsgi.input"] = BytesIO(data_bytes)
  74. environ["CONTENT_LENGTH"] = str(len(data_bytes))
  75. environ["wsgi.errors"] = sys.stderr
  76. status = headers = None
  77. def start_response(status_: str, headers_: List[Tuple[str, str]]
  78. ) -> None:
  79. nonlocal status, headers
  80. status = status_
  81. headers = headers_
  82. answers = list(self.application(environ, start_response))
  83. assert status is not None and headers is not None
  84. return (int(status.split()[0]), dict(headers),
  85. answers[0].decode() if answers else "")
  86. @staticmethod
  87. def parse_responses(text: str) -> RESPONSES:
  88. xml = DefusedET.fromstring(text)
  89. assert xml.tag == xmlutils.make_clark("D:multistatus")
  90. path_responses: Dict[str, Union[
  91. int, Dict[str, Tuple[int, ET.Element]]]] = {}
  92. for response in xml.findall(xmlutils.make_clark("D:response")):
  93. href = response.find(xmlutils.make_clark("D:href"))
  94. assert href.text not in path_responses
  95. prop_respones: Dict[str, Tuple[int, ET.Element]] = {}
  96. for propstat in response.findall(
  97. xmlutils.make_clark("D:propstat")):
  98. status = propstat.find(xmlutils.make_clark("D:status"))
  99. assert status.text.startswith("HTTP/1.1 ")
  100. status_code = int(status.text.split(" ")[1])
  101. for element in propstat.findall(
  102. "./%s/*" % xmlutils.make_clark("D:prop")):
  103. human_tag = xmlutils.make_human_tag(element.tag)
  104. assert human_tag not in prop_respones
  105. prop_respones[human_tag] = (status_code, element)
  106. status = response.find(xmlutils.make_clark("D:status"))
  107. if status is not None:
  108. assert not prop_respones
  109. assert status.text.startswith("HTTP/1.1 ")
  110. status_code = int(status.text.split(" ")[1])
  111. path_responses[href.text] = status_code
  112. else:
  113. path_responses[href.text] = prop_respones
  114. return path_responses
  115. @staticmethod
  116. def _check_status(status: int, good_status: int,
  117. check: Union[bool, int] = True) -> bool:
  118. if check is not False:
  119. expected = good_status if check is True else check
  120. assert status == expected, "%d != %d" % (status, expected)
  121. return status == good_status
  122. def get(self, path: str, check: Union[bool, int] = True, **kwargs
  123. ) -> Tuple[int, str]:
  124. assert "data" not in kwargs
  125. status, _, answer = self.request("GET", path, **kwargs)
  126. self._check_status(status, 200, check)
  127. return status, answer
  128. def post(self, path: str, data: str = None, check: Union[bool, int] = True,
  129. **kwargs) -> Tuple[int, str]:
  130. status, _, answer = self.request("POST", path, data, **kwargs)
  131. self._check_status(status, 200, check)
  132. return status, answer
  133. def put(self, path: str, data: str, check: Union[bool, int] = True,
  134. **kwargs) -> Tuple[int, str]:
  135. status, _, answer = self.request("PUT", path, data, **kwargs)
  136. self._check_status(status, 201, check)
  137. return status, answer
  138. def propfind(self, path: str, data: Optional[str] = None,
  139. check: Union[bool, int] = True, **kwargs
  140. ) -> Tuple[int, RESPONSES]:
  141. status, _, answer = self.request("PROPFIND", path, data, **kwargs)
  142. if not self._check_status(status, 207, check):
  143. return status, {}
  144. assert answer is not None
  145. responses = self.parse_responses(answer)
  146. if kwargs.get("HTTP_DEPTH", "0") == "0":
  147. assert len(responses) == 1 and path in responses
  148. return status, responses
  149. def proppatch(self, path: str, data: Optional[str] = None,
  150. check: Union[bool, int] = True, **kwargs
  151. ) -> Tuple[int, RESPONSES]:
  152. status, _, answer = self.request("PROPPATCH", path, data, **kwargs)
  153. if not self._check_status(status, 207, check):
  154. return status, {}
  155. assert answer is not None
  156. responses = self.parse_responses(answer)
  157. assert len(responses) == 1 and path in responses
  158. return status, responses
  159. def report(self, path: str, data: str, check: Union[bool, int] = True,
  160. **kwargs) -> Tuple[int, RESPONSES]:
  161. status, _, answer = self.request("REPORT", path, data, **kwargs)
  162. if not self._check_status(status, 207, check):
  163. return status, {}
  164. assert answer is not None
  165. return status, self.parse_responses(answer)
  166. def delete(self, path: str, check: Union[bool, int] = True, **kwargs
  167. ) -> Tuple[int, RESPONSES]:
  168. assert "data" not in kwargs
  169. status, _, answer = self.request("DELETE", path, **kwargs)
  170. if not self._check_status(status, 200, check):
  171. return status, {}
  172. assert answer is not None
  173. responses = self.parse_responses(answer)
  174. assert len(responses) == 1 and path in responses
  175. return status, responses
  176. def mkcalendar(self, path: str, data: Optional[str] = None,
  177. check: Union[bool, int] = True, **kwargs
  178. ) -> Tuple[int, str]:
  179. status, _, answer = self.request("MKCALENDAR", path, data, **kwargs)
  180. self._check_status(status, 201, check)
  181. return status, answer
  182. def mkcol(self, path: str, data: Optional[str] = None,
  183. check: Union[bool, int] = True, **kwargs) -> int:
  184. status, _, _ = self.request("MKCOL", path, data, **kwargs)
  185. self._check_status(status, 201, check)
  186. return status
  187. def create_addressbook(self, path: str, check: Union[bool, int] = True,
  188. **kwargs) -> int:
  189. assert "data" not in kwargs
  190. return self.mkcol(path, """\
  191. <?xml version="1.0" encoding="UTF-8" ?>
  192. <create xmlns="DAV:" xmlns:CR="urn:ietf:params:xml:ns:carddav">
  193. <set>
  194. <prop>
  195. <resourcetype>
  196. <collection />
  197. <CR:addressbook />
  198. </resourcetype>
  199. </prop>
  200. </set>
  201. </create>""", check=check, **kwargs)