__init__.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. # This file is part of Radicale Server - Calendar Server
  2. # Copyright © 2012-2017 Guillaume Ayoub
  3. # Copyright © 2017-2018 Unrud <unrud@outlook.com>
  4. #
  5. # This library is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This library is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  17. """
  18. Tests for Radicale.
  19. """
  20. import logging
  21. import os
  22. import sys
  23. from io import BytesIO
  24. import defusedxml.ElementTree as DefusedET
  25. from pytest_cov import embed
  26. import radicale
  27. from radicale import server, xmlutils
  28. # Measure coverage of forked processes
  29. finish_request = server.ParallelHTTPServer.finish_request
  30. pid = os.getpid()
  31. def finish_request_cov(self, request, client_address):
  32. cov = None
  33. if pid != os.getpid():
  34. cov = embed.init()
  35. try:
  36. return finish_request(self, request, client_address)
  37. finally:
  38. if cov:
  39. embed.cleanup(cov)
  40. server.ParallelHTTPServer.finish_request = finish_request_cov
  41. # Enable debug output
  42. radicale.log.logger.setLevel(logging.DEBUG)
  43. class BaseTest:
  44. """Base class for tests."""
  45. def request(self, method, path, data=None, **args):
  46. """Send a request."""
  47. for key in args:
  48. args[key.upper()] = args[key]
  49. args["REQUEST_METHOD"] = method.upper()
  50. args["PATH_INFO"] = path
  51. if data:
  52. data = data.encode()
  53. args["wsgi.input"] = BytesIO(data)
  54. args["CONTENT_LENGTH"] = str(len(data))
  55. args["wsgi.errors"] = sys.stderr
  56. status = headers = None
  57. def start_response(status_, headers_):
  58. nonlocal status, headers
  59. status = status_
  60. headers = headers_
  61. answer = self.application(args, start_response)
  62. return (int(status.split()[0]), dict(headers),
  63. answer[0].decode() if answer else None)
  64. @staticmethod
  65. def parse_responses(text):
  66. xml = DefusedET.fromstring(text)
  67. assert xml.tag == xmlutils.make_clark("D:multistatus")
  68. path_responses = {}
  69. for response in xml.findall(xmlutils.make_clark("D:response")):
  70. href = response.find(xmlutils.make_clark("D:href"))
  71. assert href.text not in path_responses
  72. prop_respones = {}
  73. for propstat in response.findall(
  74. xmlutils.make_clark("D:propstat")):
  75. status = propstat.find(xmlutils.make_clark("D:status"))
  76. assert status.text.startswith("HTTP/1.1 ")
  77. status_code = int(status.text.split(" ")[1])
  78. for prop in propstat.findall(xmlutils.make_clark("D:prop")):
  79. for element in prop:
  80. human_tag = xmlutils.make_human_tag(element.tag)
  81. assert human_tag not in prop_respones
  82. prop_respones[human_tag] = (status_code, element)
  83. status = response.find(xmlutils.make_clark("D:status"))
  84. if status is not None:
  85. assert not prop_respones
  86. assert status.text.startswith("HTTP/1.1 ")
  87. status_code = int(status.text.split(" ")[1])
  88. path_responses[href.text] = status_code
  89. else:
  90. path_responses[href.text] = prop_respones
  91. return path_responses
  92. @staticmethod
  93. def _check_status(status, good_status, check=True):
  94. if check is not False:
  95. assert status in (good_status, check)
  96. return status == good_status
  97. def get(self, path, check=True, **args):
  98. status, _, answer = self.request("GET", path, **args)
  99. self._check_status(status, 200, check)
  100. return status, answer
  101. def put(self, path, data, check=True, **args):
  102. status, _, answer = self.request("PUT", path, data, **args)
  103. self._check_status(status, 201, check)
  104. return status
  105. def propfind(self, path, data=None, check=True, **args):
  106. status, _, answer = self.request("PROPFIND", path, data, **args)
  107. if not self._check_status(status, 207, check):
  108. return status, None
  109. responses = self.parse_responses(answer)
  110. if args.get("HTTP_DEPTH", 0) == 0:
  111. assert len(responses) == 1 and path in responses
  112. return status, responses
  113. def proppatch(self, path, data=None, check=True, **args):
  114. status, _, answer = self.request("PROPPATCH", path, data, **args)
  115. if not self._check_status(status, 207, check):
  116. return status, None
  117. responses = self.parse_responses(answer)
  118. assert len(responses) == 1 and path in responses
  119. return status, responses
  120. def report(self, path, data, check=True, **args):
  121. status, _, answer = self.request("REPORT", path, data, **args)
  122. if not self._check_status(status, 207, check):
  123. return status, None
  124. return status, self.parse_responses(answer)
  125. def delete(self, path, check=True, **args):
  126. status, _, answer = self.request("DELETE", path, **args)
  127. if not self._check_status(status, 200, check):
  128. return status, None
  129. responses = self.parse_responses(answer)
  130. assert len(responses) == 1 and path in responses
  131. return status, responses
  132. def mkcalendar(self, path, data=None, check=True, **args):
  133. status, _, _ = self.request("MKCALENDAR", path, data, **args)
  134. self._check_status(status, 201, check)
  135. return status
  136. def mkcol(self, path, data=None, check=True, **args):
  137. status, _, _ = self.request("MKCOL", path, data, **args)
  138. self._check_status(status, 201, check)
  139. return status
  140. def create_addressbook(self, path, check=True, **args):
  141. return self.mkcol(path, """\
  142. <?xml version="1.0" encoding="UTF-8" ?>
  143. <create xmlns="DAV:" xmlns:CR="urn:ietf:params:xml:ns:carddav">
  144. <set>
  145. <prop>
  146. <resourcetype>
  147. <collection />
  148. <CR:addressbook />
  149. </resourcetype>
  150. </prop>
  151. </set>
  152. </create>""", check=check, **args)