Explorar el Código

Integrated server without threads and busy waiting

Unrud hace 10 años
padre
commit
bbe71c1ad1
Se han modificado 1 ficheros con 52 adiciones y 49 borrados
  1. 52 49
      radicale/__main__.py

+ 52 - 49
radicale/__main__.py

@@ -28,8 +28,9 @@ import atexit
 import os
 import sys
 import optparse
+import select
 import signal
-import threading
+import socket
 from wsgiref.simple_server import make_server
 
 from . import (
@@ -137,69 +138,71 @@ def run():
     atexit.register(cleanup)
     log.LOGGER.info("Starting Radicale")
 
+    log.LOGGER.debug(
+        "Base URL prefix: %s" % config.get("server", "base_prefix"))
+
     # Create collection servers
-    servers = []
+    servers = {}
     server_class = (
         HTTPSServer if config.getboolean("server", "ssl") else HTTPServer)
-    shutdown_program = threading.Event()
+    shutdown_program = [False]
 
     for host in config.get("server", "hosts").split(","):
         address, port = host.strip().rsplit(":", 1)
         address, port = address.strip("[] "), int(port)
-        servers.append(
-            make_server(address, port, Application(),
-                        server_class, RequestHandler))
-
-    # SIGTERM and SIGINT (aka KeyboardInterrupt) should just mark this for
-    # shutdown
-    signal.signal(signal.SIGTERM, lambda *_: shutdown_program.set())
-    signal.signal(signal.SIGINT, lambda *_: shutdown_program.set())
-
-    def serve_forever(server):
-        """Serve a server forever, cleanly shutdown when things go wrong."""
-        try:
-            server.serve_forever()
-        finally:
-            shutdown_program.set()
-
-    log.LOGGER.debug(
-        "Base URL prefix: %s" % config.get("server", "base_prefix"))
-
-    # Start the servers in a different loop to avoid possible race-conditions,
-    # when a server exists but another server is added to the list at the same
-    # time
-    for server in servers:
+        server = make_server(address, port, Application(),
+                             server_class, RequestHandler)
+        servers[server.socket] = server
         log.LOGGER.debug(
             "Listening to %s port %s" % (
                 server.server_name, server.server_port))
         if config.getboolean("server", "ssl"):
             log.LOGGER.debug("Using SSL")
-        threading.Thread(target=serve_forever, args=(server,)).start()
 
-    log.LOGGER.debug("Radicale server ready")
-
-    # Main loop: wait until all servers are exited
-    try:
-        # We must do the busy-waiting here, as all ``.join()`` calls completly
-        # block the thread, such that signals are not received
-        while True:
-            # The number is irrelevant, it only needs to be greater than 0.05
-            # due to python implementing its own busy-waiting logic
-            shutdown_program.wait(5.0)
-            if shutdown_program.is_set():
-                break
-    finally:
-        # Ignore signals, so that they cannot interfere
-        signal.signal(signal.SIGINT, signal.SIG_IGN)
-        signal.signal(signal.SIGTERM, signal.SIG_IGN)
+    # Create a socket pair to notify the select syscall of program shutdown
+    # This is not available in python < 3.5 on Windows
+    if hasattr(socket, "socketpair"):
+        shutdown_program_socket_in, shutdown_program_socket_out = \
+            socket.socketpair()
+    else:
+        shutdown_program_socket_in, shutdown_program_socket_out = None, None
 
+    # SIGTERM and SIGINT (aka KeyboardInterrupt) should just mark this for
+    # shutdown
+    def shutdown(*_):
+        if shutdown_program[0]:
+            # Ignore following signals
+            return
         log.LOGGER.info("Stopping Radicale")
-
-        for server in servers:
-            log.LOGGER.debug(
-                "Closing server listening to %s port %s" % (
-                    server.server_name, server.server_port))
-            server.shutdown()
+        shutdown_program[0] = True
+        if shutdown_program_socket_in:
+            shutdown_program_socket_in.sendall(b"goodbye")
+    signal.signal(signal.SIGTERM, shutdown)
+    signal.signal(signal.SIGINT, shutdown)
+
+    # Main loop: wait for requests on any of the servers or program shutdown
+    sockets = list(servers.keys())
+    if shutdown_program_socket_out:
+        # Use socket pair to get notified of program shutdown
+        sockets.append(shutdown_program_socket_out)
+        select_timeout = None
+    else:
+        # Fallback to busy waiting
+        select_timeout = 1.0
+    log.LOGGER.debug("Radicale server ready")
+    while not shutdown_program[0]:
+        try:
+            rlist, _, xlist = select.select(sockets, [], sockets,
+                                            select_timeout)
+        except (KeyboardInterrupt, select.error):
+            # SIGINT ist handled by signal handler above
+            rlist, _, xlist = [], [], []
+        if xlist:
+            raise RuntimeError("Unhandled socket error")
+        if rlist:
+            server = servers.get(rlist[0])
+            if server:
+                server.handle_request()
 
 # pylint: enable=R0912,R0914