1
0

__init__.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2012-2017 Guillaume Ayoub
  3. # Copyright © 2017-2018 Unrud <unrud@outlook.com>
  4. #
  5. # This library is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This library is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  17. """
  18. Tests for Radicale.
  19. """
  20. import base64
  21. import logging
  22. import shutil
  23. import sys
  24. import tempfile
  25. import wsgiref.util
  26. import xml.etree.ElementTree as ET
  27. from io import BytesIO
  28. from typing import Any, Dict, List, Optional, Tuple, Union
  29. from urllib.parse import quote
  30. import defusedxml.ElementTree as DefusedET
  31. import vobject
  32. import radicale
  33. from radicale import app, config, types, xmlutils
  34. RESPONSES = Dict[str, Union[int, Dict[str, Tuple[int, ET.Element]], vobject.base.Component]]
  35. # Enable debug output
  36. radicale.log.logger.setLevel(logging.DEBUG)
  37. class BaseTest:
  38. """Base class for tests."""
  39. colpath: str
  40. configuration: config.Configuration
  41. application: app.Application
  42. def setup_method(self) -> None:
  43. self.configuration = config.load()
  44. self.colpath = tempfile.mkdtemp()
  45. self.configure({
  46. "storage": {"filesystem_folder": self.colpath,
  47. # Disable syncing to disk for better performance
  48. "_filesystem_fsync": "False"},
  49. # Set incorrect authentication delay to a short duration
  50. "auth": {"delay": "0.001"}})
  51. def configure(self, config_: types.CONFIG) -> None:
  52. self.configuration.update(config_, "test", privileged=True)
  53. self.application = app.Application(self.configuration)
  54. def teardown_method(self) -> None:
  55. shutil.rmtree(self.colpath)
  56. def request(self, method: str, path: str, data: Optional[str] = None,
  57. check: Optional[int] = None, **kwargs
  58. ) -> Tuple[int, Dict[str, str], str]:
  59. """Send a request."""
  60. login = kwargs.pop("login", None)
  61. if login is not None and not isinstance(login, str):
  62. raise TypeError("login argument must be %r, not %r" %
  63. (str, type(login)))
  64. http_if_match = kwargs.pop("http_if_match", None)
  65. if http_if_match is not None and not isinstance(http_if_match, str):
  66. raise TypeError("http_if_match argument must be %r, not %r" %
  67. (str, type(http_if_match)))
  68. environ: Dict[str, Any] = {k.upper(): v for k, v in kwargs.items()}
  69. for k, v in environ.items():
  70. if not isinstance(v, str):
  71. raise TypeError("type of %r is %r, expected %r" %
  72. (k, type(v), str))
  73. encoding: str = self.configuration.get("encoding", "request")
  74. if login:
  75. environ["HTTP_AUTHORIZATION"] = "Basic " + base64.b64encode(
  76. login.encode(encoding)).decode()
  77. if http_if_match:
  78. environ["HTTP_IF_MATCH"] = http_if_match
  79. environ["REQUEST_METHOD"] = method.upper()
  80. environ["PATH_INFO"] = path
  81. if data is not None:
  82. data_bytes = data.encode(encoding)
  83. environ["wsgi.input"] = BytesIO(data_bytes)
  84. environ["CONTENT_LENGTH"] = str(len(data_bytes))
  85. environ["wsgi.errors"] = sys.stderr
  86. wsgiref.util.setup_testing_defaults(environ)
  87. status = headers = None
  88. def start_response(status_: str, headers_: List[Tuple[str, str]]
  89. ) -> None:
  90. nonlocal status, headers
  91. status = int(status_.split()[0])
  92. headers = dict(headers_)
  93. answers = list(self.application(environ, start_response))
  94. assert status is not None and headers is not None
  95. assert check is None or status == check, "%d != %d" % (status, check)
  96. return status, headers, answers[0].decode() if answers else ""
  97. @staticmethod
  98. def parse_responses(text: str) -> RESPONSES:
  99. xml = DefusedET.fromstring(text)
  100. assert xml.tag == xmlutils.make_clark("D:multistatus")
  101. path_responses: RESPONSES = {}
  102. for response in xml.findall(xmlutils.make_clark("D:response")):
  103. href = response.find(xmlutils.make_clark("D:href"))
  104. assert href.text not in path_responses
  105. prop_responses: Dict[str, Tuple[int, ET.Element]] = {}
  106. for propstat in response.findall(
  107. xmlutils.make_clark("D:propstat")):
  108. status = propstat.find(xmlutils.make_clark("D:status"))
  109. assert status.text.startswith("HTTP/1.1 ")
  110. status_code = int(status.text.split(" ")[1])
  111. for element in propstat.findall(
  112. "./%s/*" % xmlutils.make_clark("D:prop")):
  113. human_tag = xmlutils.make_human_tag(element.tag)
  114. assert human_tag not in prop_responses
  115. prop_responses[human_tag] = (status_code, element)
  116. status = response.find(xmlutils.make_clark("D:status"))
  117. if status is not None:
  118. assert not prop_responses
  119. assert status.text.startswith("HTTP/1.1 ")
  120. status_code = int(status.text.split(" ")[1])
  121. path_responses[href.text] = status_code
  122. else:
  123. path_responses[href.text] = prop_responses
  124. return path_responses
  125. @staticmethod
  126. def parse_free_busy(text: str) -> RESPONSES:
  127. path_responses: RESPONSES = {}
  128. path_responses[""] = vobject.readOne(text)
  129. return path_responses
  130. def get(self, path: str, check: Optional[int] = 200, **kwargs
  131. ) -> Tuple[int, str]:
  132. assert "data" not in kwargs
  133. status, _, answer = self.request("GET", path, check=check, **kwargs)
  134. return status, answer
  135. def post(self, path: str, data: Optional[str] = None,
  136. check: Optional[int] = 200, **kwargs) -> Tuple[int, str]:
  137. status, _, answer = self.request("POST", path, data, check=check,
  138. **kwargs)
  139. return status, answer
  140. def put(self, path: str, data: str, check: Optional[int] = 201,
  141. **kwargs) -> Tuple[int, str]:
  142. status, _, answer = self.request("PUT", path, data, check=check,
  143. **kwargs)
  144. return status, answer
  145. def propfind(self, path: str, data: Optional[str] = None,
  146. check: Optional[int] = 207, **kwargs
  147. ) -> Tuple[int, RESPONSES]:
  148. status, _, answer = self.request("PROPFIND", path, data, check=check,
  149. **kwargs)
  150. if status < 200 or 300 <= status:
  151. return status, {}
  152. assert answer is not None
  153. responses = self.parse_responses(answer)
  154. if kwargs.get("HTTP_DEPTH", "0") == "0":
  155. assert len(responses) == 1 and quote(path) in responses
  156. return status, responses
  157. def proppatch(self, path: str, data: Optional[str] = None,
  158. check: Optional[int] = 207, **kwargs
  159. ) -> Tuple[int, RESPONSES]:
  160. status, _, answer = self.request("PROPPATCH", path, data, check=check,
  161. **kwargs)
  162. if status < 200 or 300 <= status:
  163. return status, {}
  164. assert answer is not None
  165. responses = self.parse_responses(answer)
  166. assert len(responses) == 1 and path in responses
  167. return status, responses
  168. def report(self, path: str, data: str, check: Optional[int] = 207,
  169. is_xml: Optional[bool] = True,
  170. **kwargs) -> Tuple[int, RESPONSES]:
  171. status, _, answer = self.request("REPORT", path, data, check=check,
  172. **kwargs)
  173. if status < 200 or 300 <= status:
  174. return status, {}
  175. assert answer is not None
  176. if is_xml:
  177. parsed = self.parse_responses(answer)
  178. else:
  179. parsed = self.parse_free_busy(answer)
  180. return status, parsed
  181. def delete(self, path: str, check: Optional[int] = 200, **kwargs
  182. ) -> Tuple[int, RESPONSES]:
  183. assert "data" not in kwargs
  184. status, _, answer = self.request("DELETE", path, check=check, **kwargs)
  185. if status < 200 or 300 <= status:
  186. return status, {}
  187. assert answer is not None
  188. responses = self.parse_responses(answer)
  189. assert len(responses) == 1 and path in responses
  190. return status, responses
  191. def mkcalendar(self, path: str, data: Optional[str] = None,
  192. check: Optional[int] = 201, **kwargs
  193. ) -> Tuple[int, str]:
  194. status, _, answer = self.request("MKCALENDAR", path, data, check=check,
  195. **kwargs)
  196. return status, answer
  197. def mkcol(self, path: str, data: Optional[str] = None,
  198. check: Optional[int] = 201, **kwargs) -> int:
  199. status, _, _ = self.request("MKCOL", path, data, check=check, **kwargs)
  200. return status
  201. def create_addressbook(self, path: str, check: Optional[int] = 201,
  202. **kwargs) -> int:
  203. assert "data" not in kwargs
  204. return self.mkcol(path, """\
  205. <?xml version="1.0" encoding="UTF-8" ?>
  206. <create xmlns="DAV:" xmlns:CR="urn:ietf:params:xml:ns:carddav">
  207. <set>
  208. <prop>
  209. <resourcetype>
  210. <collection />
  211. <CR:addressbook />
  212. </resourcetype>
  213. </prop>
  214. </set>
  215. </create>""", check=check, **kwargs)