__init__.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2012-2017 Guillaume Ayoub
  3. # Copyright © 2017-2018 Unrud <unrud@outlook.com>
  4. #
  5. # This library is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This library is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  17. """
  18. Tests for Radicale.
  19. """
  20. import base64
  21. import logging
  22. import shutil
  23. import sys
  24. import tempfile
  25. import wsgiref.util
  26. import xml.etree.ElementTree as ET
  27. from io import BytesIO
  28. from typing import Any, Dict, List, Optional, Tuple, Union
  29. from urllib.parse import quote
  30. import defusedxml.ElementTree as DefusedET
  31. import vobject
  32. import radicale
  33. from radicale import app, config, types, xmlutils
  34. RESPONSES = Dict[str, Union[int, Dict[str, Tuple[int, ET.Element]], vobject.base.Component]]
  35. # Enable debug output
  36. radicale.log.logger.setLevel(logging.DEBUG)
  37. class BaseTest:
  38. """Base class for tests."""
  39. colpath: str
  40. configuration: config.Configuration
  41. application: app.Application
  42. def setup_method(self) -> None:
  43. self.configuration = config.load()
  44. self.colpath = tempfile.mkdtemp()
  45. self.configure({
  46. "storage": {"filesystem_folder": self.colpath,
  47. # Disable syncing to disk for better performance
  48. "_filesystem_fsync": "False"},
  49. # Set incorrect authentication delay to a short duration
  50. "auth": {"delay": "0.001"}})
  51. def configure(self, config_: types.CONFIG) -> None:
  52. self.configuration.update(config_, "test", privileged=True)
  53. self.application = app.Application(self.configuration)
  54. def teardown_method(self) -> None:
  55. shutil.rmtree(self.colpath)
  56. def request(self, method: str, path: str, data: Optional[str] = None,
  57. check: Optional[int] = None, **kwargs
  58. ) -> Tuple[int, Dict[str, str], str]:
  59. """Send a request."""
  60. login = kwargs.pop("login", None)
  61. if login is not None and not isinstance(login, str):
  62. raise TypeError("login argument must be %r, not %r" %
  63. (str, type(login)))
  64. environ: Dict[str, Any] = {k.upper(): v for k, v in kwargs.items()}
  65. for k, v in environ.items():
  66. if not isinstance(v, str):
  67. raise TypeError("type of %r is %r, expected %r" %
  68. (k, type(v), str))
  69. encoding: str = self.configuration.get("encoding", "request")
  70. if login:
  71. environ["HTTP_AUTHORIZATION"] = "Basic " + base64.b64encode(
  72. login.encode(encoding)).decode()
  73. environ["REQUEST_METHOD"] = method.upper()
  74. environ["PATH_INFO"] = path
  75. if data is not None:
  76. data_bytes = data.encode(encoding)
  77. environ["wsgi.input"] = BytesIO(data_bytes)
  78. environ["CONTENT_LENGTH"] = str(len(data_bytes))
  79. environ["wsgi.errors"] = sys.stderr
  80. wsgiref.util.setup_testing_defaults(environ)
  81. status = headers = None
  82. def start_response(status_: str, headers_: List[Tuple[str, str]]
  83. ) -> None:
  84. nonlocal status, headers
  85. status = int(status_.split()[0])
  86. headers = dict(headers_)
  87. answers = list(self.application(environ, start_response))
  88. assert status is not None and headers is not None
  89. assert check is None or status == check, "%d != %d" % (status, check)
  90. return status, headers, answers[0].decode() if answers else ""
  91. @staticmethod
  92. def parse_responses(text: str) -> RESPONSES:
  93. xml = DefusedET.fromstring(text)
  94. assert xml.tag == xmlutils.make_clark("D:multistatus")
  95. path_responses: RESPONSES = {}
  96. for response in xml.findall(xmlutils.make_clark("D:response")):
  97. href = response.find(xmlutils.make_clark("D:href"))
  98. assert href.text not in path_responses
  99. prop_responses: Dict[str, Tuple[int, ET.Element]] = {}
  100. for propstat in response.findall(
  101. xmlutils.make_clark("D:propstat")):
  102. status = propstat.find(xmlutils.make_clark("D:status"))
  103. assert status.text.startswith("HTTP/1.1 ")
  104. status_code = int(status.text.split(" ")[1])
  105. for element in propstat.findall(
  106. "./%s/*" % xmlutils.make_clark("D:prop")):
  107. human_tag = xmlutils.make_human_tag(element.tag)
  108. assert human_tag not in prop_responses
  109. prop_responses[human_tag] = (status_code, element)
  110. status = response.find(xmlutils.make_clark("D:status"))
  111. if status is not None:
  112. assert not prop_responses
  113. assert status.text.startswith("HTTP/1.1 ")
  114. status_code = int(status.text.split(" ")[1])
  115. path_responses[href.text] = status_code
  116. else:
  117. path_responses[href.text] = prop_responses
  118. return path_responses
  119. @staticmethod
  120. def parse_free_busy(text: str) -> RESPONSES:
  121. path_responses: RESPONSES = {}
  122. path_responses[""] = vobject.readOne(text)
  123. return path_responses
  124. def get(self, path: str, check: Optional[int] = 200, **kwargs
  125. ) -> Tuple[int, str]:
  126. assert "data" not in kwargs
  127. status, _, answer = self.request("GET", path, check=check, **kwargs)
  128. return status, answer
  129. def post(self, path: str, data: Optional[str] = None,
  130. check: Optional[int] = 200, **kwargs) -> Tuple[int, str]:
  131. status, _, answer = self.request("POST", path, data, check=check,
  132. **kwargs)
  133. return status, answer
  134. def put(self, path: str, data: str, check: Optional[int] = 201,
  135. **kwargs) -> Tuple[int, str]:
  136. status, _, answer = self.request("PUT", path, data, check=check,
  137. **kwargs)
  138. return status, answer
  139. def propfind(self, path: str, data: Optional[str] = None,
  140. check: Optional[int] = 207, **kwargs
  141. ) -> Tuple[int, RESPONSES]:
  142. status, _, answer = self.request("PROPFIND", path, data, check=check,
  143. **kwargs)
  144. if status < 200 or 300 <= status:
  145. return status, {}
  146. assert answer is not None
  147. responses = self.parse_responses(answer)
  148. if kwargs.get("HTTP_DEPTH", "0") == "0":
  149. assert len(responses) == 1 and quote(path) in responses
  150. return status, responses
  151. def proppatch(self, path: str, data: Optional[str] = None,
  152. check: Optional[int] = 207, **kwargs
  153. ) -> Tuple[int, RESPONSES]:
  154. status, _, answer = self.request("PROPPATCH", path, data, check=check,
  155. **kwargs)
  156. if status < 200 or 300 <= status:
  157. return status, {}
  158. assert answer is not None
  159. responses = self.parse_responses(answer)
  160. assert len(responses) == 1 and path in responses
  161. return status, responses
  162. def report(self, path: str, data: str, check: Optional[int] = 207,
  163. is_xml: Optional[bool] = True,
  164. **kwargs) -> Tuple[int, RESPONSES]:
  165. status, _, answer = self.request("REPORT", path, data, check=check,
  166. **kwargs)
  167. if status < 200 or 300 <= status:
  168. return status, {}
  169. assert answer is not None
  170. if is_xml:
  171. parsed = self.parse_responses(answer)
  172. else:
  173. parsed = self.parse_free_busy(answer)
  174. return status, parsed
  175. def delete(self, path: str, check: Optional[int] = 200, **kwargs
  176. ) -> Tuple[int, RESPONSES]:
  177. assert "data" not in kwargs
  178. status, _, answer = self.request("DELETE", path, check=check, **kwargs)
  179. if status < 200 or 300 <= status:
  180. return status, {}
  181. assert answer is not None
  182. responses = self.parse_responses(answer)
  183. assert len(responses) == 1 and path in responses
  184. return status, responses
  185. def mkcalendar(self, path: str, data: Optional[str] = None,
  186. check: Optional[int] = 201, **kwargs
  187. ) -> Tuple[int, str]:
  188. status, _, answer = self.request("MKCALENDAR", path, data, check=check,
  189. **kwargs)
  190. return status, answer
  191. def mkcol(self, path: str, data: Optional[str] = None,
  192. check: Optional[int] = 201, **kwargs) -> int:
  193. status, _, _ = self.request("MKCOL", path, data, check=check, **kwargs)
  194. return status
  195. def create_addressbook(self, path: str, check: Optional[int] = 201,
  196. **kwargs) -> int:
  197. assert "data" not in kwargs
  198. return self.mkcol(path, """\
  199. <?xml version="1.0" encoding="UTF-8" ?>
  200. <create xmlns="DAV:" xmlns:CR="urn:ietf:params:xml:ns:carddav">
  201. <set>
  202. <prop>
  203. <resourcetype>
  204. <collection />
  205. <CR:addressbook />
  206. </resourcetype>
  207. </prop>
  208. </set>
  209. </create>""", check=check, **kwargs)