server.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2008 Nicolas Kandel
  3. # Copyright © 2008 Pascal Halter
  4. # Copyright © 2008-2017 Guillaume Ayoub
  5. # Copyright © 2017-2019 Unrud <unrud@outlook.com>
  6. #
  7. # This library is free software: you can redistribute it and/or modify
  8. # it under the terms of the GNU General Public License as published by
  9. # the Free Software Foundation, either version 3 of the License, or
  10. # (at your option) any later version.
  11. #
  12. # This library is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. # GNU General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU General Public License
  18. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  19. """
  20. Built-in WSGI server.
  21. """
  22. import errno
  23. import http
  24. import select
  25. import socket
  26. import socketserver
  27. import ssl
  28. import sys
  29. import wsgiref.simple_server
  30. from typing import (Any, Callable, Dict, List, MutableMapping, Optional, Set,
  31. Tuple, Union)
  32. from urllib.parse import unquote
  33. from radicale import Application, config
  34. from radicale.log import logger
  35. COMPAT_EAI_ADDRFAMILY: int
  36. if hasattr(socket, "EAI_ADDRFAMILY"):
  37. COMPAT_EAI_ADDRFAMILY = socket.EAI_ADDRFAMILY # type:ignore[attr-defined]
  38. elif hasattr(socket, "EAI_NONAME"):
  39. # Windows and BSD don't have a special error code for this
  40. COMPAT_EAI_ADDRFAMILY = socket.EAI_NONAME
  41. COMPAT_EAI_NODATA: int
  42. if hasattr(socket, "EAI_NODATA"):
  43. COMPAT_EAI_NODATA = socket.EAI_NODATA
  44. elif hasattr(socket, "EAI_NONAME"):
  45. # Windows and BSD don't have a special error code for this
  46. COMPAT_EAI_NODATA = socket.EAI_NONAME
  47. COMPAT_IPPROTO_IPV6: int
  48. if hasattr(socket, "IPPROTO_IPV6"):
  49. COMPAT_IPPROTO_IPV6 = socket.IPPROTO_IPV6
  50. elif sys.platform == "win32":
  51. # HACK: https://bugs.python.org/issue29515
  52. COMPAT_IPPROTO_IPV6 = 41
  53. # IPv4 (host, port) and IPv6 (host, port, flowinfo, scopeid)
  54. ADDRESS_TYPE = Union[Tuple[str, int], Tuple[str, int, int, int]]
  55. def format_address(address: ADDRESS_TYPE) -> str:
  56. return "[%s]:%d" % address[:2]
  57. class ParallelHTTPServer(socketserver.ThreadingMixIn,
  58. wsgiref.simple_server.WSGIServer):
  59. configuration: config.Configuration
  60. worker_sockets: Set[socket.socket]
  61. _timeout: float
  62. # We wait for child threads ourself (ThreadingMixIn)
  63. block_on_close: bool = False
  64. daemon_threads: bool = True
  65. def __init__(self, configuration: config.Configuration, family: int,
  66. address: Tuple[str, int], RequestHandlerClass:
  67. Callable[..., http.server.BaseHTTPRequestHandler]) -> None:
  68. self.configuration = configuration
  69. self.address_family = family
  70. super().__init__(address, RequestHandlerClass)
  71. self.worker_sockets = set()
  72. self._timeout = configuration.get("server", "timeout")
  73. def server_bind(self) -> None:
  74. if self.address_family == socket.AF_INET6:
  75. # Only allow IPv6 connections to the IPv6 socket
  76. self.socket.setsockopt(COMPAT_IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
  77. super().server_bind()
  78. def get_request( # type:ignore[override]
  79. self) -> Tuple[socket.socket, Tuple[ADDRESS_TYPE, socket.socket]]:
  80. # Set timeout for client
  81. request: socket.socket
  82. client_address: ADDRESS_TYPE
  83. request, client_address = super().get_request() # type:ignore[misc]
  84. if self._timeout > 0:
  85. request.settimeout(self._timeout)
  86. worker_socket, worker_socket_out = socket.socketpair()
  87. self.worker_sockets.add(worker_socket_out)
  88. # HACK: Forward `worker_socket` via `client_address` return value
  89. # to worker thread.
  90. # The super class calls `verify_request`, `process_request` and
  91. # `handle_error` with modified `client_address` value.
  92. return request, (client_address, worker_socket)
  93. def verify_request( # type:ignore[override]
  94. self, request: socket.socket, client_address_and_socket:
  95. Tuple[ADDRESS_TYPE, socket.socket]) -> bool:
  96. return True
  97. def process_request( # type:ignore[override]
  98. self, request: socket.socket, client_address_and_socket:
  99. Tuple[ADDRESS_TYPE, socket.socket]) -> None:
  100. # HACK: Super class calls `finish_request` in new thread with
  101. # `client_address_and_socket`
  102. return super().process_request(
  103. request, client_address_and_socket) # type:ignore[arg-type]
  104. def finish_request( # type:ignore[override]
  105. self, request: socket.socket, client_address_and_socket:
  106. Tuple[ADDRESS_TYPE, socket.socket]) -> None:
  107. # HACK: Unpack `client_address_and_socket` and call super class
  108. # `finish_request` with original `client_address`
  109. client_address, worker_socket = client_address_and_socket
  110. try:
  111. return self.finish_request_locked(request, client_address)
  112. finally:
  113. worker_socket.close()
  114. def finish_request_locked(self, request: socket.socket,
  115. client_address: ADDRESS_TYPE) -> None:
  116. return super().finish_request(
  117. request, client_address) # type:ignore[arg-type]
  118. def handle_error( # type:ignore[override]
  119. self, request: socket.socket,
  120. client_address_or_client_address_and_socket:
  121. Union[ADDRESS_TYPE, Tuple[ADDRESS_TYPE, socket.socket]]) -> None:
  122. # HACK: This method can be called with the modified
  123. # `client_address_and_socket` or the original `client_address` value
  124. e = sys.exc_info()[1]
  125. assert e is not None
  126. if isinstance(e, socket.timeout):
  127. logger.info("Client timed out", exc_info=True)
  128. else:
  129. logger.error("An exception occurred during request: %s",
  130. sys.exc_info()[1], exc_info=True)
  131. class ParallelHTTPSServer(ParallelHTTPServer):
  132. def server_bind(self) -> None:
  133. super().server_bind()
  134. # Wrap the TCP socket in an SSL socket
  135. certfile: str = self.configuration.get("server", "certificate")
  136. keyfile: str = self.configuration.get("server", "key")
  137. cafile: str = self.configuration.get("server", "certificate_authority")
  138. # Test if the files can be read
  139. for name, filename in [("certificate", certfile), ("key", keyfile),
  140. ("certificate_authority", cafile)]:
  141. type_name = config.DEFAULT_CONFIG_SCHEMA["server"][name][
  142. "type"].__name__
  143. source = self.configuration.get_source("server", name)
  144. if name == "certificate_authority" and not filename:
  145. continue
  146. try:
  147. open(filename, "r").close()
  148. except OSError as e:
  149. raise RuntimeError(
  150. "Invalid %s value for option %r in section %r in %s: %r "
  151. "(%s)" % (type_name, name, "server", source, filename,
  152. e)) from e
  153. context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
  154. context.load_cert_chain(certfile=certfile, keyfile=keyfile)
  155. if cafile:
  156. context.load_verify_locations(cafile=cafile)
  157. context.verify_mode = ssl.CERT_REQUIRED
  158. self.socket = context.wrap_socket(
  159. self.socket, server_side=True, do_handshake_on_connect=False)
  160. def finish_request_locked( # type:ignore[override]
  161. self, request: ssl.SSLSocket, client_address: ADDRESS_TYPE
  162. ) -> None:
  163. try:
  164. try:
  165. request.do_handshake()
  166. except socket.timeout:
  167. raise
  168. except Exception as e:
  169. raise RuntimeError("SSL handshake failed: %s" % e) from e
  170. except Exception:
  171. try:
  172. self.handle_error(request, client_address)
  173. finally:
  174. self.shutdown_request(request) # type:ignore[attr-defined]
  175. return
  176. return super().finish_request_locked(request, client_address)
  177. class ServerHandler(wsgiref.simple_server.ServerHandler):
  178. # Don't pollute WSGI environ with OS environment
  179. os_environ: MutableMapping[str, str] = {}
  180. def log_exception(self, exc_info: "wsgiref.handlers._exc_info") -> None:
  181. logger.error("An exception occurred during request: %s",
  182. exc_info[1], exc_info=exc_info) # type:ignore[arg-type]
  183. class RequestHandler(wsgiref.simple_server.WSGIRequestHandler):
  184. """HTTP requests handler."""
  185. # HACK: Assigned in `socketserver.StreamRequestHandler`
  186. connection: socket.socket
  187. def log_request(self, code: Union[int, str] = "-",
  188. size: Union[int, str] = "-") -> None:
  189. pass # Disable request logging.
  190. def log_error(self, format_: str, *args: Any) -> None:
  191. logger.error("An error occurred during request: %s", format_ % args)
  192. def get_environ(self) -> Dict[str, Any]:
  193. env = super().get_environ()
  194. if isinstance(self.connection, ssl.SSLSocket):
  195. # The certificate can be evaluated by the auth module
  196. env["REMOTE_CERTIFICATE"] = self.connection.getpeercert()
  197. # Parent class only tries latin1 encoding
  198. env["PATH_INFO"] = unquote(self.path.split("?", 1)[0])
  199. return env
  200. def handle(self) -> None:
  201. """Copy of WSGIRequestHandler.handle with different ServerHandler"""
  202. self.raw_requestline = self.rfile.readline(65537)
  203. if len(self.raw_requestline) > 65536:
  204. self.requestline = ""
  205. self.request_version = ""
  206. self.command = ""
  207. self.send_error(414)
  208. return
  209. if not self.parse_request():
  210. return
  211. handler = ServerHandler(
  212. self.rfile, self.wfile, self.get_stderr(), self.get_environ()
  213. )
  214. handler.request_handler = self # type:ignore[attr-defined]
  215. app = self.server.get_app() # type:ignore[attr-defined]
  216. handler.run(app)
  217. def serve(configuration: config.Configuration,
  218. shutdown_socket: Optional[socket.socket] = None) -> None:
  219. """Serve radicale from configuration.
  220. `shutdown_socket` can be used to gracefully shutdown the server.
  221. The socket can be created with `socket.socketpair()`, when the other socket
  222. gets closed the server stops accepting new requests by clients and the
  223. function returns after all active requests are finished.
  224. """
  225. logger.info("Starting Radicale")
  226. # Copy configuration before modifying
  227. configuration = configuration.copy()
  228. configuration.update({"server": {"_internal_server": "True"}}, "server",
  229. privileged=True)
  230. use_ssl: bool = configuration.get("server", "ssl")
  231. server_class = ParallelHTTPSServer if use_ssl else ParallelHTTPServer
  232. application = Application(configuration)
  233. servers = {}
  234. try:
  235. hosts: List[Tuple[str, int]] = configuration.get("server", "hosts")
  236. for address in hosts:
  237. # Try to bind sockets for IPv4 and IPv6
  238. possible_families = (socket.AF_INET, socket.AF_INET6)
  239. bind_ok = False
  240. for i, family in enumerate(possible_families):
  241. is_last = i == len(possible_families) - 1
  242. try:
  243. server = server_class(configuration, family, address,
  244. RequestHandler)
  245. except OSError as e:
  246. # Ignore unsupported families (only one must work)
  247. if ((bind_ok or not is_last) and (
  248. isinstance(e, socket.gaierror) and (
  249. # Hostname does not exist or doesn't have
  250. # address for address family
  251. # macOS: IPv6 address for INET address family
  252. e.errno == socket.EAI_NONAME or
  253. # Address not for address family
  254. e.errno == COMPAT_EAI_ADDRFAMILY or
  255. e.errno == COMPAT_EAI_NODATA) or
  256. # Workaround for PyPy
  257. str(e) == "address family mismatched" or
  258. # Address family not available (e.g. IPv6 disabled)
  259. # macOS: IPv4 address for INET6 address family with
  260. # IPV6_V6ONLY set
  261. e.errno == errno.EADDRNOTAVAIL or
  262. # Address family not supported
  263. e.errno == errno.EAFNOSUPPORT or
  264. # Protocol not supported
  265. e.errno == errno.EPROTONOSUPPORT)):
  266. continue
  267. raise RuntimeError("Failed to start server %r: %s" % (
  268. format_address(address), e)) from e
  269. servers[server.socket] = server
  270. bind_ok = True
  271. server.set_app(application)
  272. logger.info("Listening on %r%s",
  273. format_address(server.server_address),
  274. " with SSL" if use_ssl else "")
  275. if not servers:
  276. raise RuntimeError("No servers started")
  277. # Mainloop
  278. select_timeout = None
  279. if sys.platform == "win32":
  280. # Fallback to busy waiting. (select(...) blocks SIGINT on Windows.)
  281. select_timeout = 1.0
  282. max_connections: int = configuration.get("server", "max_connections")
  283. logger.info("Radicale server ready")
  284. while True:
  285. rlist: List[socket.socket] = []
  286. # Wait for finished clients
  287. for server in servers.values():
  288. rlist.extend(server.worker_sockets)
  289. # Accept new connections if max_connections is not reached
  290. if max_connections <= 0 or len(rlist) < max_connections:
  291. rlist.extend(servers)
  292. # Use socket to get notified of program shutdown
  293. if shutdown_socket is not None:
  294. rlist.append(shutdown_socket)
  295. rlist, _, _ = select.select(rlist, [], [], select_timeout)
  296. rset = set(rlist)
  297. if shutdown_socket in rset:
  298. logger.info("Stopping Radicale")
  299. break
  300. for server in servers.values():
  301. finished_sockets = server.worker_sockets.intersection(rset)
  302. for s in finished_sockets:
  303. s.close()
  304. server.worker_sockets.remove(s)
  305. rset.remove(s)
  306. if finished_sockets:
  307. server.service_actions()
  308. if rset:
  309. active_server = servers.get(rset.pop())
  310. if active_server:
  311. active_server.handle_request()
  312. finally:
  313. # Wait for clients to finish and close servers
  314. for server in servers.values():
  315. for s in server.worker_sockets:
  316. s.recv(1)
  317. s.close()
  318. server.server_close()