__init__.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2012-2017 Guillaume Ayoub
  3. # Copyright © 2017-2018 Unrud <unrud@outlook.com>
  4. #
  5. # This library is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This library is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  17. """
  18. Tests for Radicale.
  19. """
  20. import base64
  21. import logging
  22. import shutil
  23. import sys
  24. import tempfile
  25. import xml.etree.ElementTree as ET
  26. from io import BytesIO
  27. from typing import Any, Dict, List, Optional, Tuple, Union
  28. import defusedxml.ElementTree as DefusedET
  29. import radicale
  30. from radicale import app, config, types, xmlutils
  31. RESPONSES = Dict[str, Union[int, Dict[str, Tuple[int, ET.Element]]]]
  32. # Enable debug output
  33. radicale.log.logger.setLevel(logging.DEBUG)
  34. class BaseTest:
  35. """Base class for tests."""
  36. colpath: str
  37. configuration: config.Configuration
  38. application: app.Application
  39. def setup(self) -> None:
  40. self.configuration = config.load()
  41. self.colpath = tempfile.mkdtemp()
  42. self.configure({
  43. "storage": {"filesystem_folder": self.colpath,
  44. # Disable syncing to disk for better performance
  45. "_filesystem_fsync": "False"},
  46. # Set incorrect authentication delay to a short duration
  47. "auth": {"delay": "0.001"}})
  48. def configure(self, config_: types.CONFIG) -> None:
  49. self.configuration.update(config_, "test", privileged=True)
  50. self.application = app.Application(self.configuration)
  51. def teardown(self) -> None:
  52. shutil.rmtree(self.colpath)
  53. def request(self, method: str, path: str, data: Optional[str] = None,
  54. check: Optional[int] = None, **kwargs
  55. ) -> Tuple[int, Dict[str, str], str]:
  56. """Send a request."""
  57. login = kwargs.pop("login", None)
  58. if login is not None and not isinstance(login, str):
  59. raise TypeError("login argument must be %r, not %r" %
  60. (str, type(login)))
  61. environ: Dict[str, Any] = {k.upper(): v for k, v in kwargs.items()}
  62. for k, v in environ.items():
  63. if not isinstance(v, str):
  64. raise TypeError("type of %r is %r, expected %r" %
  65. (k, type(v), str))
  66. encoding: str = self.configuration.get("encoding", "request")
  67. if login:
  68. environ["HTTP_AUTHORIZATION"] = "Basic " + base64.b64encode(
  69. login.encode(encoding)).decode()
  70. environ["REQUEST_METHOD"] = method.upper()
  71. environ["PATH_INFO"] = path
  72. if data:
  73. data_bytes = data.encode(encoding)
  74. environ["wsgi.input"] = BytesIO(data_bytes)
  75. environ["CONTENT_LENGTH"] = str(len(data_bytes))
  76. environ["wsgi.errors"] = sys.stderr
  77. status = headers = None
  78. def start_response(status_: str, headers_: List[Tuple[str, str]]
  79. ) -> None:
  80. nonlocal status, headers
  81. status = int(status_.split()[0])
  82. headers = dict(headers_)
  83. answers = list(self.application(environ, start_response))
  84. assert status is not None and headers is not None
  85. assert check is None or status == check, "%d != %d" % (status, check)
  86. return status, headers, answers[0].decode() if answers else ""
  87. @staticmethod
  88. def parse_responses(text: str) -> RESPONSES:
  89. xml = DefusedET.fromstring(text)
  90. assert xml.tag == xmlutils.make_clark("D:multistatus")
  91. path_responses: Dict[str, Union[
  92. int, Dict[str, Tuple[int, ET.Element]]]] = {}
  93. for response in xml.findall(xmlutils.make_clark("D:response")):
  94. href = response.find(xmlutils.make_clark("D:href"))
  95. assert href.text not in path_responses
  96. prop_respones: Dict[str, Tuple[int, ET.Element]] = {}
  97. for propstat in response.findall(
  98. xmlutils.make_clark("D:propstat")):
  99. status = propstat.find(xmlutils.make_clark("D:status"))
  100. assert status.text.startswith("HTTP/1.1 ")
  101. status_code = int(status.text.split(" ")[1])
  102. for element in propstat.findall(
  103. "./%s/*" % xmlutils.make_clark("D:prop")):
  104. human_tag = xmlutils.make_human_tag(element.tag)
  105. assert human_tag not in prop_respones
  106. prop_respones[human_tag] = (status_code, element)
  107. status = response.find(xmlutils.make_clark("D:status"))
  108. if status is not None:
  109. assert not prop_respones
  110. assert status.text.startswith("HTTP/1.1 ")
  111. status_code = int(status.text.split(" ")[1])
  112. path_responses[href.text] = status_code
  113. else:
  114. path_responses[href.text] = prop_respones
  115. return path_responses
  116. def get(self, path: str, check: Optional[int] = 200, **kwargs
  117. ) -> Tuple[int, str]:
  118. assert "data" not in kwargs
  119. status, _, answer = self.request("GET", path, check=check, **kwargs)
  120. return status, answer
  121. def post(self, path: str, data: str = None, check: Optional[int] = 200,
  122. **kwargs) -> Tuple[int, str]:
  123. status, _, answer = self.request("POST", path, data, check=check,
  124. **kwargs)
  125. return status, answer
  126. def put(self, path: str, data: str, check: Optional[int] = 201,
  127. **kwargs) -> Tuple[int, str]:
  128. status, _, answer = self.request("PUT", path, data, check=check,
  129. **kwargs)
  130. return status, answer
  131. def propfind(self, path: str, data: Optional[str] = None,
  132. check: Optional[int] = 207, **kwargs
  133. ) -> Tuple[int, RESPONSES]:
  134. status, _, answer = self.request("PROPFIND", path, data, check=check,
  135. **kwargs)
  136. if status < 200 or 300 <= status:
  137. return status, {}
  138. assert answer is not None
  139. responses = self.parse_responses(answer)
  140. if kwargs.get("HTTP_DEPTH", "0") == "0":
  141. assert len(responses) == 1 and path in responses
  142. return status, responses
  143. def proppatch(self, path: str, data: Optional[str] = None,
  144. check: Optional[int] = 207, **kwargs
  145. ) -> Tuple[int, RESPONSES]:
  146. status, _, answer = self.request("PROPPATCH", path, data, check=check,
  147. **kwargs)
  148. if status < 200 or 300 <= status:
  149. return status, {}
  150. assert answer is not None
  151. responses = self.parse_responses(answer)
  152. assert len(responses) == 1 and path in responses
  153. return status, responses
  154. def report(self, path: str, data: str, check: Optional[int] = 207,
  155. **kwargs) -> Tuple[int, RESPONSES]:
  156. status, _, answer = self.request("REPORT", path, data, check=check,
  157. **kwargs)
  158. if status < 200 or 300 <= status:
  159. return status, {}
  160. assert answer is not None
  161. return status, self.parse_responses(answer)
  162. def delete(self, path: str, check: Optional[int] = 200, **kwargs
  163. ) -> Tuple[int, RESPONSES]:
  164. assert "data" not in kwargs
  165. status, _, answer = self.request("DELETE", path, check=check, **kwargs)
  166. if status < 200 or 300 <= status:
  167. return status, {}
  168. assert answer is not None
  169. responses = self.parse_responses(answer)
  170. assert len(responses) == 1 and path in responses
  171. return status, responses
  172. def mkcalendar(self, path: str, data: Optional[str] = None,
  173. check: Optional[int] = 201, **kwargs
  174. ) -> Tuple[int, str]:
  175. status, _, answer = self.request("MKCALENDAR", path, data, check=check,
  176. **kwargs)
  177. return status, answer
  178. def mkcol(self, path: str, data: Optional[str] = None,
  179. check: Optional[int] = 201, **kwargs) -> int:
  180. status, _, _ = self.request("MKCOL", path, data, check=check, **kwargs)
  181. return status
  182. def create_addressbook(self, path: str, check: Optional[int] = 201,
  183. **kwargs) -> int:
  184. assert "data" not in kwargs
  185. return self.mkcol(path, """\
  186. <?xml version="1.0" encoding="UTF-8" ?>
  187. <create xmlns="DAV:" xmlns:CR="urn:ietf:params:xml:ns:carddav">
  188. <set>
  189. <prop>
  190. <resourcetype>
  191. <collection />
  192. <CR:addressbook />
  193. </resourcetype>
  194. </prop>
  195. </set>
  196. </create>""", check=check, **kwargs)