test_server.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2018-2019 Unrud <unrud@outlook.com>
  3. #
  4. # This library is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This library is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  16. """
  17. Test the internal server.
  18. """
  19. import errno
  20. import os
  21. import socket
  22. import ssl
  23. import subprocess
  24. import sys
  25. import threading
  26. import time
  27. from configparser import RawConfigParser
  28. from http.client import HTTPMessage
  29. from typing import IO, Callable, Dict, Optional, Tuple, cast
  30. from urllib import request
  31. from urllib.error import HTTPError, URLError
  32. import pytest
  33. from radicale import config, server
  34. from radicale.tests import BaseTest
  35. from radicale.tests.helpers import configuration_to_dict, get_file_path
  36. class DisabledRedirectHandler(request.HTTPRedirectHandler):
  37. def redirect_request(
  38. self, req: request.Request, fp: IO[bytes], code: int, msg: str,
  39. headers: HTTPMessage, newurl: str) -> None:
  40. return None
  41. class TestBaseServerRequests(BaseTest):
  42. """Test the internal server."""
  43. shutdown_socket: socket.socket
  44. thread: threading.Thread
  45. opener: request.OpenerDirector
  46. def setup_method(self) -> None:
  47. super().setup_method()
  48. self.shutdown_socket, shutdown_socket_out = socket.socketpair()
  49. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
  50. # Find available port
  51. sock.bind(("127.0.0.1", 0))
  52. self.sockfamily = socket.AF_INET
  53. self.sockname = sock.getsockname()
  54. self.configure({"server": {"hosts": "%s:%d" % self.sockname},
  55. # Enable debugging for new processes
  56. "logging": {"level": "debug"}})
  57. self.thread = threading.Thread(target=server.serve, args=(
  58. self.configuration, shutdown_socket_out))
  59. ssl_context = ssl.create_default_context()
  60. ssl_context.check_hostname = False
  61. ssl_context.verify_mode = ssl.CERT_NONE
  62. self.opener = request.build_opener(
  63. request.HTTPSHandler(context=ssl_context),
  64. DisabledRedirectHandler)
  65. def teardown_method(self) -> None:
  66. self.shutdown_socket.close()
  67. try:
  68. self.thread.join()
  69. except RuntimeError: # Thread never started
  70. pass
  71. super().teardown_method()
  72. def request(self, method: str, path: str, data: Optional[str] = None,
  73. check: Optional[int] = None, **kwargs
  74. ) -> Tuple[int, Dict[str, str], str]:
  75. """Send a request."""
  76. login = kwargs.pop("login", None)
  77. if login is not None and not isinstance(login, str):
  78. raise TypeError("login argument must be %r, not %r" %
  79. (str, type(login)))
  80. if login:
  81. raise NotImplementedError
  82. is_alive_fn: Optional[Callable[[], bool]] = kwargs.pop(
  83. "is_alive_fn", None)
  84. headers: Dict[str, str] = kwargs
  85. for k, v in headers.items():
  86. if not isinstance(v, str):
  87. raise TypeError("type of %r is %r, expected %r" %
  88. (k, type(v), str))
  89. if is_alive_fn is None:
  90. is_alive_fn = self.thread.is_alive
  91. encoding: str = self.configuration.get("encoding", "request")
  92. scheme = "https" if self.configuration.get("server", "ssl") else "http"
  93. data_bytes = None
  94. if data:
  95. data_bytes = data.encode(encoding)
  96. if self.sockfamily == socket.AF_INET6:
  97. req_host = ("[%s]" % self.sockname[0])
  98. else:
  99. req_host = self.sockname[0]
  100. req = request.Request(
  101. "%s://%s:%d%s" % (scheme, req_host, self.sockname[1], path),
  102. data=data_bytes, headers=headers, method=method)
  103. while True:
  104. assert is_alive_fn()
  105. try:
  106. with self.opener.open(req) as f:
  107. return f.getcode(), dict(f.info()), f.read().decode()
  108. except HTTPError as e:
  109. assert check is None or e.code == check, "%d != %d" % (e.code,
  110. check)
  111. return e.code, dict(e.headers), e.read().decode()
  112. except URLError as e:
  113. if not isinstance(e.reason, ConnectionRefusedError):
  114. raise
  115. time.sleep(0.1)
  116. def test_root(self) -> None:
  117. self.thread.start()
  118. self.get("/", check=302)
  119. def test_ssl(self) -> None:
  120. self.configure({"server": {"ssl": "True",
  121. "certificate": get_file_path("cert.pem"),
  122. "key": get_file_path("key.pem")}})
  123. self.thread.start()
  124. self.get("/", check=302)
  125. def test_bind_fail(self) -> None:
  126. for address_family, address in [(socket.AF_INET, "::1"),
  127. (socket.AF_INET6, "127.0.0.1")]:
  128. with socket.socket(address_family, socket.SOCK_STREAM) as sock:
  129. if address_family == socket.AF_INET6:
  130. # Only allow IPv6 connections to the IPv6 socket
  131. sock.setsockopt(server.COMPAT_IPPROTO_IPV6,
  132. socket.IPV6_V6ONLY, 1)
  133. with pytest.raises(OSError) as exc_info:
  134. sock.bind((address, 0))
  135. # See ``radicale.server.serve``
  136. assert (isinstance(exc_info.value, socket.gaierror) and
  137. exc_info.value.errno in (
  138. socket.EAI_NONAME, server.COMPAT_EAI_ADDRFAMILY,
  139. server.COMPAT_EAI_NODATA) or
  140. str(exc_info.value) == "address family mismatched" or
  141. exc_info.value.errno in (
  142. errno.EADDRNOTAVAIL, errno.EAFNOSUPPORT,
  143. errno.EPROTONOSUPPORT))
  144. def test_ipv6(self) -> None:
  145. try:
  146. with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
  147. # Only allow IPv6 connections to the IPv6 socket
  148. sock.setsockopt(
  149. server.COMPAT_IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
  150. # Find available port
  151. sock.bind(("::1", 0))
  152. self.sockfamily = socket.AF_INET6
  153. self.sockname = sock.getsockname()[:2]
  154. except OSError as e:
  155. if e.errno in (errno.EADDRNOTAVAIL, errno.EAFNOSUPPORT,
  156. errno.EPROTONOSUPPORT):
  157. pytest.skip("IPv6 not supported")
  158. raise
  159. self.configure({"server": {"hosts": "[%s]:%d" % self.sockname}})
  160. self.thread.start()
  161. self.get("/", check=302)
  162. def test_command_line_interface(self, with_bool_options=False) -> None:
  163. self.configure({"headers": {"Test-Server": "test"}})
  164. config_args = []
  165. for section in self.configuration.sections():
  166. if section.startswith("_"):
  167. continue
  168. for option in self.configuration.options(section):
  169. if option.startswith("_"):
  170. continue
  171. long_name = "--%s-%s" % (section, option.replace("_", "-"))
  172. if with_bool_options and config.DEFAULT_CONFIG_SCHEMA.get(
  173. section, {}).get(option, {}).get("type") == bool:
  174. if not cast(bool, self.configuration.get(section, option)):
  175. long_name = "--no%s" % long_name[1:]
  176. config_args.append(long_name)
  177. else:
  178. config_args.append(long_name)
  179. raw_value = self.configuration.get_raw(section, option)
  180. assert isinstance(raw_value, str)
  181. config_args.append(raw_value)
  182. config_args.append("--headers-Test-Header=test")
  183. p = subprocess.Popen(
  184. [sys.executable, "-m", "radicale"] + config_args,
  185. env={**os.environ, "PYTHONPATH": os.pathsep.join(sys.path)})
  186. try:
  187. status, headers, _ = self.request(
  188. "GET", "/", check=302, is_alive_fn=lambda: p.poll() is None)
  189. for key in self.configuration.options("headers"):
  190. assert headers.get(key) == self.configuration.get(
  191. "headers", key)
  192. finally:
  193. p.terminate()
  194. p.wait()
  195. if sys.platform != "win32":
  196. assert p.returncode == 0
  197. def test_command_line_interface_with_bool_options(self) -> None:
  198. self.test_command_line_interface(with_bool_options=True)
  199. def test_wsgi_server(self) -> None:
  200. config_path = os.path.join(self.colpath, "config")
  201. parser = RawConfigParser()
  202. parser.read_dict(configuration_to_dict(self.configuration))
  203. with open(config_path, "w") as f:
  204. parser.write(f)
  205. env = os.environ.copy()
  206. env["PYTHONPATH"] = os.pathsep.join(sys.path)
  207. env["RADICALE_CONFIG"] = config_path
  208. raw_server_hosts = self.configuration.get_raw("server", "hosts")
  209. assert isinstance(raw_server_hosts, str)
  210. p = subprocess.Popen([
  211. sys.executable, "-m", "waitress", "--listen", raw_server_hosts,
  212. "radicale:application"], env=env)
  213. try:
  214. self.get("/", is_alive_fn=lambda: p.poll() is None, check=302)
  215. finally:
  216. p.terminate()
  217. p.wait()