__init__.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2012-2017 Guillaume Ayoub
  3. # Copyright © 2017-2018 Unrud <unrud@outlook.com>
  4. #
  5. # This library is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This library is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  17. """
  18. Tests for Radicale.
  19. """
  20. import base64
  21. import logging
  22. import shutil
  23. import sys
  24. import tempfile
  25. import wsgiref.util
  26. import xml.etree.ElementTree as ET
  27. import vobject
  28. from io import BytesIO
  29. from typing import Any, Dict, List, Optional, Tuple, Union
  30. import defusedxml.ElementTree as DefusedET
  31. import radicale
  32. from radicale import app, config, types, xmlutils
  33. RESPONSES = Dict[str, Union[int, Dict[str, Tuple[int, ET.Element]], vobject.base.Component]]
  34. # Enable debug output
  35. radicale.log.logger.setLevel(logging.DEBUG)
  36. class BaseTest:
  37. """Base class for tests."""
  38. colpath: str
  39. configuration: config.Configuration
  40. application: app.Application
  41. def setup_method(self) -> None:
  42. self.configuration = config.load()
  43. self.colpath = tempfile.mkdtemp()
  44. self.configure({
  45. "storage": {"filesystem_folder": self.colpath,
  46. # Disable syncing to disk for better performance
  47. "_filesystem_fsync": "False"},
  48. # Set incorrect authentication delay to a short duration
  49. "auth": {"delay": "0.001"}})
  50. def configure(self, config_: types.CONFIG) -> None:
  51. self.configuration.update(config_, "test", privileged=True)
  52. self.application = app.Application(self.configuration)
  53. def teardown_method(self) -> None:
  54. shutil.rmtree(self.colpath)
  55. def request(self, method: str, path: str, data: Optional[str] = None,
  56. check: Optional[int] = None, **kwargs
  57. ) -> Tuple[int, Dict[str, str], str]:
  58. """Send a request."""
  59. login = kwargs.pop("login", None)
  60. if login is not None and not isinstance(login, str):
  61. raise TypeError("login argument must be %r, not %r" %
  62. (str, type(login)))
  63. environ: Dict[str, Any] = {k.upper(): v for k, v in kwargs.items()}
  64. for k, v in environ.items():
  65. if not isinstance(v, str):
  66. raise TypeError("type of %r is %r, expected %r" %
  67. (k, type(v), str))
  68. encoding: str = self.configuration.get("encoding", "request")
  69. if login:
  70. environ["HTTP_AUTHORIZATION"] = "Basic " + base64.b64encode(
  71. login.encode(encoding)).decode()
  72. environ["REQUEST_METHOD"] = method.upper()
  73. environ["PATH_INFO"] = path
  74. if data is not None:
  75. data_bytes = data.encode(encoding)
  76. environ["wsgi.input"] = BytesIO(data_bytes)
  77. environ["CONTENT_LENGTH"] = str(len(data_bytes))
  78. environ["wsgi.errors"] = sys.stderr
  79. wsgiref.util.setup_testing_defaults(environ)
  80. status = headers = None
  81. def start_response(status_: str, headers_: List[Tuple[str, str]]
  82. ) -> None:
  83. nonlocal status, headers
  84. status = int(status_.split()[0])
  85. headers = dict(headers_)
  86. answers = list(self.application(environ, start_response))
  87. assert status is not None and headers is not None
  88. assert check is None or status == check, "%d != %d" % (status, check)
  89. return status, headers, answers[0].decode() if answers else ""
  90. @staticmethod
  91. def parse_responses(text: str) -> RESPONSES:
  92. xml = DefusedET.fromstring(text)
  93. assert xml.tag == xmlutils.make_clark("D:multistatus")
  94. path_responses: RESPONSES = {}
  95. for response in xml.findall(xmlutils.make_clark("D:response")):
  96. href = response.find(xmlutils.make_clark("D:href"))
  97. assert href.text not in path_responses
  98. prop_respones: Dict[str, Tuple[int, ET.Element]] = {}
  99. for propstat in response.findall(
  100. xmlutils.make_clark("D:propstat")):
  101. status = propstat.find(xmlutils.make_clark("D:status"))
  102. assert status.text.startswith("HTTP/1.1 ")
  103. status_code = int(status.text.split(" ")[1])
  104. for element in propstat.findall(
  105. "./%s/*" % xmlutils.make_clark("D:prop")):
  106. human_tag = xmlutils.make_human_tag(element.tag)
  107. assert human_tag not in prop_respones
  108. prop_respones[human_tag] = (status_code, element)
  109. status = response.find(xmlutils.make_clark("D:status"))
  110. if status is not None:
  111. assert not prop_respones
  112. assert status.text.startswith("HTTP/1.1 ")
  113. status_code = int(status.text.split(" ")[1])
  114. path_responses[href.text] = status_code
  115. else:
  116. path_responses[href.text] = prop_respones
  117. return path_responses
  118. @staticmethod
  119. def parse_free_busy(text: str) -> RESPONSES:
  120. path_responses: RESPONSES = {}
  121. path_responses[""] = vobject.readOne(text)
  122. return path_responses
  123. def get(self, path: str, check: Optional[int] = 200, **kwargs
  124. ) -> Tuple[int, str]:
  125. assert "data" not in kwargs
  126. status, _, answer = self.request("GET", path, check=check, **kwargs)
  127. return status, answer
  128. def post(self, path: str, data: Optional[str] = None,
  129. check: Optional[int] = 200, **kwargs) -> Tuple[int, str]:
  130. status, _, answer = self.request("POST", path, data, check=check,
  131. **kwargs)
  132. return status, answer
  133. def put(self, path: str, data: str, check: Optional[int] = 201,
  134. **kwargs) -> Tuple[int, str]:
  135. status, _, answer = self.request("PUT", path, data, check=check,
  136. **kwargs)
  137. return status, answer
  138. def propfind(self, path: str, data: Optional[str] = None,
  139. check: Optional[int] = 207, **kwargs
  140. ) -> Tuple[int, RESPONSES]:
  141. status, _, answer = self.request("PROPFIND", path, data, check=check,
  142. **kwargs)
  143. if status < 200 or 300 <= status:
  144. return status, {}
  145. assert answer is not None
  146. responses = self.parse_responses(answer)
  147. if kwargs.get("HTTP_DEPTH", "0") == "0":
  148. assert len(responses) == 1 and path in responses
  149. return status, responses
  150. def proppatch(self, path: str, data: Optional[str] = None,
  151. check: Optional[int] = 207, **kwargs
  152. ) -> Tuple[int, RESPONSES]:
  153. status, _, answer = self.request("PROPPATCH", path, data, check=check,
  154. **kwargs)
  155. if status < 200 or 300 <= status:
  156. return status, {}
  157. assert answer is not None
  158. responses = self.parse_responses(answer)
  159. assert len(responses) == 1 and path in responses
  160. return status, responses
  161. def report(self, path: str, data: str, check: Optional[int] = 207,
  162. is_xml: Optional[bool] = True,
  163. **kwargs) -> Tuple[int, RESPONSES]:
  164. status, _, answer = self.request("REPORT", path, data, check=check,
  165. **kwargs)
  166. if status < 200 or 300 <= status:
  167. return status, {}
  168. assert answer is not None
  169. if is_xml:
  170. parsed = self.parse_responses(answer)
  171. else:
  172. parsed = self.parse_free_busy(answer)
  173. return status, parsed
  174. def delete(self, path: str, check: Optional[int] = 200, **kwargs
  175. ) -> Tuple[int, RESPONSES]:
  176. assert "data" not in kwargs
  177. status, _, answer = self.request("DELETE", path, check=check, **kwargs)
  178. if status < 200 or 300 <= status:
  179. return status, {}
  180. assert answer is not None
  181. responses = self.parse_responses(answer)
  182. assert len(responses) == 1 and path in responses
  183. return status, responses
  184. def mkcalendar(self, path: str, data: Optional[str] = None,
  185. check: Optional[int] = 201, **kwargs
  186. ) -> Tuple[int, str]:
  187. status, _, answer = self.request("MKCALENDAR", path, data, check=check,
  188. **kwargs)
  189. return status, answer
  190. def mkcol(self, path: str, data: Optional[str] = None,
  191. check: Optional[int] = 201, **kwargs) -> int:
  192. status, _, _ = self.request("MKCOL", path, data, check=check, **kwargs)
  193. return status
  194. def create_addressbook(self, path: str, check: Optional[int] = 201,
  195. **kwargs) -> int:
  196. assert "data" not in kwargs
  197. return self.mkcol(path, """\
  198. <?xml version="1.0" encoding="UTF-8" ?>
  199. <create xmlns="DAV:" xmlns:CR="urn:ietf:params:xml:ns:carddav">
  200. <set>
  201. <prop>
  202. <resourcetype>
  203. <collection />
  204. <CR:addressbook />
  205. </resourcetype>
  206. </prop>
  207. </set>
  208. </create>""", check=check, **kwargs)