server.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2008 Nicolas Kandel
  3. # Copyright © 2008 Pascal Halter
  4. # Copyright © 2008-2017 Guillaume Ayoub
  5. # Copyright © 2017-2019 Unrud <unrud@outlook.com>
  6. #
  7. # This library is free software: you can redistribute it and/or modify
  8. # it under the terms of the GNU General Public License as published by
  9. # the Free Software Foundation, either version 3 of the License, or
  10. # (at your option) any later version.
  11. #
  12. # This library is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. # GNU General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU General Public License
  18. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  19. """
  20. Built-in WSGI server.
  21. """
  22. import errno
  23. import http
  24. import select
  25. import socket
  26. import socketserver
  27. import ssl
  28. import sys
  29. import wsgiref.simple_server
  30. from typing import (Any, Callable, Dict, List, MutableMapping, Optional, Set,
  31. Tuple, Union)
  32. from urllib.parse import unquote
  33. from radicale import Application, config
  34. from radicale.log import logger
  35. COMPAT_EAI_ADDRFAMILY: int
  36. if hasattr(socket, "EAI_ADDRFAMILY"):
  37. COMPAT_EAI_ADDRFAMILY = socket.EAI_ADDRFAMILY # type:ignore[attr-defined]
  38. elif hasattr(socket, "EAI_NONAME"):
  39. # Windows and BSD don't have a special error code for this
  40. COMPAT_EAI_ADDRFAMILY = socket.EAI_NONAME
  41. COMPAT_EAI_NODATA: int
  42. if hasattr(socket, "EAI_NODATA"):
  43. COMPAT_EAI_NODATA = socket.EAI_NODATA
  44. elif hasattr(socket, "EAI_NONAME"):
  45. # Windows and BSD don't have a special error code for this
  46. COMPAT_EAI_NODATA = socket.EAI_NONAME
  47. COMPAT_IPPROTO_IPV6: int
  48. if hasattr(socket, "IPPROTO_IPV6"):
  49. COMPAT_IPPROTO_IPV6 = socket.IPPROTO_IPV6
  50. elif sys.platform == "win32":
  51. # HACK: https://bugs.python.org/issue29515
  52. COMPAT_IPPROTO_IPV6 = 41
  53. # IPv4 (host, port) and IPv6 (host, port, flowinfo, scopeid)
  54. ADDRESS_TYPE = Union[Tuple[Union[str, bytes, bytearray], int],
  55. Tuple[str, int, int, int]]
  56. def format_address(address: ADDRESS_TYPE) -> str:
  57. host, port, *_ = address
  58. if not isinstance(host, str):
  59. raise NotImplementedError("Unsupported address format: %r" %
  60. (address,))
  61. return "[%s]:%d" % (host, port)
  62. class ParallelHTTPServer(socketserver.ThreadingMixIn,
  63. wsgiref.simple_server.WSGIServer):
  64. configuration: config.Configuration
  65. worker_sockets: Set[socket.socket]
  66. _timeout: float
  67. # We wait for child threads ourself (ThreadingMixIn)
  68. block_on_close: bool = False
  69. daemon_threads: bool = True
  70. def __init__(self, configuration: config.Configuration, family: int,
  71. address: Tuple[str, int], RequestHandlerClass:
  72. Callable[..., http.server.BaseHTTPRequestHandler]) -> None:
  73. self.configuration = configuration
  74. self.address_family = family
  75. super().__init__(address, RequestHandlerClass)
  76. self.worker_sockets = set()
  77. self._timeout = configuration.get("server", "timeout")
  78. def server_bind(self) -> None:
  79. if self.address_family == socket.AF_INET6:
  80. # Only allow IPv6 connections to the IPv6 socket
  81. self.socket.setsockopt(COMPAT_IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
  82. super().server_bind()
  83. def get_request( # type:ignore[override]
  84. self) -> Tuple[socket.socket, Tuple[ADDRESS_TYPE, socket.socket]]:
  85. # Set timeout for client
  86. request: socket.socket
  87. client_address: ADDRESS_TYPE
  88. request, client_address = super().get_request() # type:ignore[misc]
  89. if self._timeout > 0:
  90. request.settimeout(self._timeout)
  91. worker_socket, worker_socket_out = socket.socketpair()
  92. self.worker_sockets.add(worker_socket_out)
  93. # HACK: Forward `worker_socket` via `client_address` return value
  94. # to worker thread.
  95. # The super class calls `verify_request`, `process_request` and
  96. # `handle_error` with modified `client_address` value.
  97. return request, (client_address, worker_socket)
  98. def verify_request( # type:ignore[override]
  99. self, request: socket.socket, client_address_and_socket:
  100. Tuple[ADDRESS_TYPE, socket.socket]) -> bool:
  101. return True
  102. def process_request( # type:ignore[override]
  103. self, request: socket.socket, client_address_and_socket:
  104. Tuple[ADDRESS_TYPE, socket.socket]) -> None:
  105. # HACK: Super class calls `finish_request` in new thread with
  106. # `client_address_and_socket`
  107. return super().process_request(
  108. request, client_address_and_socket) # type:ignore[arg-type]
  109. def finish_request( # type:ignore[override]
  110. self, request: socket.socket, client_address_and_socket:
  111. Tuple[ADDRESS_TYPE, socket.socket]) -> None:
  112. # HACK: Unpack `client_address_and_socket` and call super class
  113. # `finish_request` with original `client_address`
  114. client_address, worker_socket = client_address_and_socket
  115. try:
  116. return self.finish_request_locked(request, client_address)
  117. finally:
  118. worker_socket.close()
  119. def finish_request_locked(self, request: socket.socket,
  120. client_address: ADDRESS_TYPE) -> None:
  121. return super().finish_request(
  122. request, client_address) # type:ignore[arg-type]
  123. def handle_error( # type:ignore[override]
  124. self, request: socket.socket,
  125. client_address_or_client_address_and_socket:
  126. Union[ADDRESS_TYPE, Tuple[ADDRESS_TYPE, socket.socket]]) -> None:
  127. # HACK: This method can be called with the modified
  128. # `client_address_and_socket` or the original `client_address` value
  129. e = sys.exc_info()[1]
  130. assert e is not None
  131. if isinstance(e, socket.timeout):
  132. logger.info("Client timed out", exc_info=True)
  133. else:
  134. logger.error("An exception occurred during request: %s",
  135. sys.exc_info()[1], exc_info=True)
  136. class ParallelHTTPSServer(ParallelHTTPServer):
  137. def server_bind(self) -> None:
  138. super().server_bind()
  139. # Wrap the TCP socket in an SSL socket
  140. certfile: str = self.configuration.get("server", "certificate")
  141. keyfile: str = self.configuration.get("server", "key")
  142. cafile: str = self.configuration.get("server", "certificate_authority")
  143. # Test if the files can be read
  144. for name, filename in [("certificate", certfile), ("key", keyfile),
  145. ("certificate_authority", cafile)]:
  146. type_name = config.DEFAULT_CONFIG_SCHEMA["server"][name][
  147. "type"].__name__
  148. source = self.configuration.get_source("server", name)
  149. if name == "certificate_authority" and not filename:
  150. continue
  151. try:
  152. open(filename, "r").close()
  153. except OSError as e:
  154. raise RuntimeError(
  155. "Invalid %s value for option %r in section %r in %s: %r "
  156. "(%s)" % (type_name, name, "server", source, filename,
  157. e)) from e
  158. context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
  159. context.load_cert_chain(certfile=certfile, keyfile=keyfile)
  160. if cafile:
  161. context.load_verify_locations(cafile=cafile)
  162. context.verify_mode = ssl.CERT_REQUIRED
  163. self.socket = context.wrap_socket(
  164. self.socket, server_side=True, do_handshake_on_connect=False)
  165. def finish_request_locked( # type:ignore[override]
  166. self, request: ssl.SSLSocket, client_address: ADDRESS_TYPE
  167. ) -> None:
  168. try:
  169. try:
  170. request.do_handshake()
  171. except socket.timeout:
  172. raise
  173. except Exception as e:
  174. raise RuntimeError("SSL handshake failed: %s" % e) from e
  175. except Exception:
  176. try:
  177. self.handle_error(request, client_address)
  178. finally:
  179. self.shutdown_request(request) # type:ignore[attr-defined]
  180. return
  181. return super().finish_request_locked(request, client_address)
  182. class ServerHandler(wsgiref.simple_server.ServerHandler):
  183. # Don't pollute WSGI environ with OS environment
  184. os_environ: MutableMapping[str, str] = {}
  185. def log_exception(self, exc_info) -> None:
  186. logger.error("An exception occurred during request: %s",
  187. exc_info[1], exc_info=exc_info) # type:ignore[arg-type]
  188. class RequestHandler(wsgiref.simple_server.WSGIRequestHandler):
  189. """HTTP requests handler."""
  190. # HACK: Assigned in `socketserver.StreamRequestHandler`
  191. connection: socket.socket
  192. def log_request(self, code: Union[int, str] = "-",
  193. size: Union[int, str] = "-") -> None:
  194. pass # Disable request logging.
  195. def log_error(self, format_: str, *args: Any) -> None:
  196. logger.error("An error occurred during request: %s", format_ % args)
  197. def get_environ(self) -> Dict[str, Any]:
  198. env = super().get_environ()
  199. if isinstance(self.connection, ssl.SSLSocket):
  200. # The certificate can be evaluated by the auth module
  201. env["REMOTE_CERTIFICATE"] = self.connection.getpeercert()
  202. # Parent class only tries latin1 encoding
  203. env["PATH_INFO"] = unquote(self.path.split("?", 1)[0])
  204. return env
  205. def handle(self) -> None:
  206. """Copy of WSGIRequestHandler.handle with different ServerHandler"""
  207. self.raw_requestline = self.rfile.readline(65537)
  208. if len(self.raw_requestline) > 65536:
  209. self.requestline = ""
  210. self.request_version = ""
  211. self.command = ""
  212. self.send_error(414)
  213. return
  214. if not self.parse_request():
  215. return
  216. handler = ServerHandler(
  217. self.rfile, self.wfile, self.get_stderr(), self.get_environ()
  218. )
  219. handler.request_handler = self # type:ignore[attr-defined]
  220. app = self.server.get_app() # type:ignore[attr-defined]
  221. handler.run(app)
  222. def serve(configuration: config.Configuration,
  223. shutdown_socket: Optional[socket.socket] = None) -> None:
  224. """Serve radicale from configuration.
  225. `shutdown_socket` can be used to gracefully shutdown the server.
  226. The socket can be created with `socket.socketpair()`, when the other socket
  227. gets closed the server stops accepting new requests by clients and the
  228. function returns after all active requests are finished.
  229. """
  230. logger.info("Starting Radicale")
  231. # Copy configuration before modifying
  232. configuration = configuration.copy()
  233. configuration.update({"server": {"_internal_server": "True"}}, "server",
  234. privileged=True)
  235. use_ssl: bool = configuration.get("server", "ssl")
  236. server_class = ParallelHTTPSServer if use_ssl else ParallelHTTPServer
  237. application = Application(configuration)
  238. servers = {}
  239. try:
  240. hosts: List[Tuple[str, int]] = configuration.get("server", "hosts")
  241. for address in hosts:
  242. # Try to bind sockets for IPv4 and IPv6
  243. possible_families = (socket.AF_INET, socket.AF_INET6)
  244. bind_ok = False
  245. for i, family in enumerate(possible_families):
  246. is_last = i == len(possible_families) - 1
  247. try:
  248. server = server_class(configuration, family, address,
  249. RequestHandler)
  250. except OSError as e:
  251. # Ignore unsupported families (only one must work)
  252. if ((bind_ok or not is_last) and (
  253. isinstance(e, socket.gaierror) and (
  254. # Hostname does not exist or doesn't have
  255. # address for address family
  256. # Linux: temporary failure in name resolution (-3)
  257. e.errno == socket.EAI_AGAIN or
  258. # macOS: IPv6 address for INET address family
  259. e.errno == socket.EAI_NONAME or
  260. # Address not for address family
  261. e.errno == COMPAT_EAI_ADDRFAMILY or
  262. e.errno == COMPAT_EAI_NODATA) or
  263. # Workaround for PyPy
  264. str(e) == "address family mismatched" or
  265. # Address family not available (e.g. IPv6 disabled)
  266. # macOS: IPv4 address for INET6 address family with
  267. # IPV6_V6ONLY set
  268. e.errno == errno.EADDRNOTAVAIL or
  269. # Device or resource busy (16)
  270. e.errno == errno.EBUSY or
  271. # Address family not supported
  272. e.errno == errno.EAFNOSUPPORT or
  273. # Protocol not supported
  274. e.errno == errno.EPROTONOSUPPORT)):
  275. continue
  276. raise RuntimeError("Failed to start server %r: %s" % (
  277. format_address(address), e)) from e
  278. servers[server.socket] = server
  279. bind_ok = True
  280. server.set_app(application)
  281. logger.info("Listening on %r%s",
  282. format_address(server.server_address),
  283. " with SSL" if use_ssl else "")
  284. if not servers:
  285. raise RuntimeError("No servers started")
  286. # Mainloop
  287. select_timeout = None
  288. if sys.platform == "win32":
  289. # Fallback to busy waiting. (select(...) blocks SIGINT on Windows.)
  290. select_timeout = 1.0
  291. max_connections: int = configuration.get("server", "max_connections")
  292. logger.info("Radicale server ready")
  293. while True:
  294. rlist: List[socket.socket] = []
  295. # Wait for finished clients
  296. for server in servers.values():
  297. rlist.extend(server.worker_sockets)
  298. # Accept new connections if max_connections is not reached
  299. if max_connections <= 0 or len(rlist) < max_connections:
  300. rlist.extend(servers)
  301. # Use socket to get notified of program shutdown
  302. if shutdown_socket is not None:
  303. rlist.append(shutdown_socket)
  304. rlist, _, _ = select.select(rlist, [], [], select_timeout)
  305. rset = set(rlist)
  306. if shutdown_socket in rset:
  307. logger.info("Stopping Radicale")
  308. break
  309. for server in servers.values():
  310. finished_sockets = server.worker_sockets.intersection(rset)
  311. for s in finished_sockets:
  312. s.close()
  313. server.worker_sockets.remove(s)
  314. rset.remove(s)
  315. if finished_sockets:
  316. server.service_actions()
  317. if rset:
  318. active_server = servers.get(rset.pop())
  319. if active_server:
  320. active_server.handle_request()
  321. finally:
  322. # Wait for clients to finish and close servers
  323. for server in servers.values():
  324. for s in server.worker_sockets:
  325. s.recv(1)
  326. s.close()
  327. server.server_close()