Răsfoiți Sursa

Refactor: Extract class FileBackedRwLock

Unrud 8 ani în urmă
părinte
comite
16abbd9ea6
1 a modificat fișierele cu 89 adăugiri și 52 ștergeri
  1. 89 52
      radicale/storage.py

+ 89 - 52
radicale/storage.py

@@ -1379,7 +1379,7 @@ class Collection(BaseCollection):
 
     def get_meta(self, key=None):
         # reuse cached value if the storage is read-only
-        if self._writer or self._meta_cache is None:
+        if self._lock.locked() == "w" or self._meta_cache is None:
             try:
                 try:
                     with open(self._props_path, encoding=self._encoding) as f:
@@ -1408,56 +1408,101 @@ class Collection(BaseCollection):
     @property
     def etag(self):
         # reuse cached value if the storage is read-only
-        if self._writer or self._etag_cache is None:
+        if self._lock.locked() == "w" or self._etag_cache is None:
             self._etag_cache = super().etag
         return self._etag_cache
 
-    _lock = threading.Lock()
-    _waiters = []
-    _lock_file = None
-    _lock_file_locked = False
-    _readers = 0
-    _writer = False
+    _lock = None
 
     @classmethod
     @contextmanager
     def acquire_lock(cls, mode, user=None):
+        folder = os.path.expanduser(cls.configuration.get(
+            "storage", "filesystem_folder"))
+        if not cls._lock:
+            cls._makedirs_synced(folder)
+            lock_path = None
+            if cls.configuration.getboolean("storage", "filesystem_locking"):
+                lock_path = os.path.join(folder, ".Radicale.lock")
+            close_lock_file = cls.configuration.getboolean(
+                "storage", "filesystem_close_lock_file")
+            cls._lock = FileBackedRwLock(lock_path, close_lock_file)
+        with cls._lock.acquire_lock(mode):
+            yield
+            # execute hook
+            hook = cls.configuration.get("storage", "hook")
+            if mode == "w" and hook:
+                cls.logger.debug("Running hook")
+                subprocess.check_call(
+                    hook % {"user": shlex.quote(user or "Anonymous")},
+                    shell=True, cwd=folder)
+
+
+class FileBackedRwLock:
+    """A readers-Writer lock that can additionally lock a file.
+
+    All requests are processed in FIFO order.
+
+    """
+
+    def __init__(self, path=None, close_lock_file=True):
+        """Initilize a lock.
+
+        ``path`` the file that is used for locking (optional)
+
+        ``close_lock_file`` close the lock file, when unlocked and no requests
+        are pending
+
+        """
+        self._path = path
+        self._close_lock_file = close_lock_file
+
+        self._lock = threading.Lock()
+        self._waiters = []
+        self._lock_file = None
+        self._lock_file_locked = False
+        self._readers = 0
+        self._writer = False
+
+    def locked(self):
+        if self._writer:
+            return "w"
+        if self._readers:
+            return "r"
+        return ""
+
+    @contextmanager
+    def acquire_lock(self, mode):
         def condition():
             if mode == "r":
-                return not cls._writer
+                return not self._writer
             else:
-                return not cls._writer and cls._readers == 0
+                return not self._writer and self._readers == 0
 
-        file_locking = cls.configuration.getboolean("storage",
-                                                    "filesystem_locking")
-        folder = os.path.expanduser(cls.configuration.get(
-            "storage", "filesystem_folder"))
         # Use a primitive lock which only works within one process as a
         # precondition for inter-process file-based locking
-        with cls._lock:
-            if cls._waiters or not condition():
+        with self._lock:
+            if self._waiters or not condition():
                 # Use FIFO for access requests
-                waiter = threading.Condition(lock=cls._lock)
-                cls._waiters.append(waiter)
+                waiter = threading.Condition(lock=self._lock)
+                self._waiters.append(waiter)
                 while True:
                     waiter.wait()
                     if condition():
                         break
-                cls._waiters.pop(0)
+                self._waiters.pop(0)
             if mode == "r":
-                cls._readers += 1
+                self._readers += 1
                 # Notify additional potential readers
-                if cls._waiters:
-                    cls._waiters[0].notify()
+                if self._waiters:
+                    self._waiters[0].notify()
             else:
-                cls._writer = True
-            if not cls._lock_file:
-                cls._makedirs_synced(folder)
-                lock_path = os.path.join(folder, ".Radicale.lock")
-                cls._lock_file = open(lock_path, "w+")
-            if file_locking and not cls._lock_file_locked:
+                self._writer = True
+            if self._path and not self._lock_file_locked:
+                if not self._lock_file:
+                    self._lock_file = open(self._path, "w+")
                 if os.name == "nt":
-                    handle = msvcrt.get_osfhandle(cls._lock_file.fileno())
+                    handle = msvcrt.get_osfhandle(self._lock_file.fileno())
                     flags = LOCKFILE_EXCLUSIVE_LOCK if mode == "w" else 0
                     overlapped = Overlapped()
                     if not lock_file_ex(handle, flags, 0, 1, 0, overlapped):
@@ -1467,7 +1512,7 @@ class Collection(BaseCollection):
                 elif os.name == "posix":
                     _cmd = fcntl.LOCK_EX if mode == "w" else fcntl.LOCK_SH
                     try:
-                        fcntl.flock(cls._lock_file.fileno(), _cmd)
+                        fcntl.flock(self._lock_file.fileno(), _cmd)
                     except OSError as e:
                         raise RuntimeError("Locking the storage failed "
                                            "(can be disabled in the config): "
@@ -1476,43 +1521,35 @@ class Collection(BaseCollection):
                     raise RuntimeError("Locking the storage failed "
                                        "(can be disabled in the config): "
                                        "Unsupported operating system")
-                cls._lock_file_locked = True
+                self._lock_file_locked = True
         try:
             yield
-            # execute hook
-            hook = cls.configuration.get("storage", "hook")
-            if mode == "w" and hook:
-                cls.logger.debug("Running hook")
-                subprocess.check_call(
-                    hook % {"user": shlex.quote(user or "Anonymous")},
-                    shell=True, cwd=folder)
         finally:
-            with cls._lock:
+            with self._lock:
                 if mode == "r":
-                    cls._readers -= 1
+                    self._readers -= 1
                 else:
-                    cls._writer = False
-                if file_locking and cls._readers == 0:
+                    self._writer = False
+                if self._lock_file_locked and self._readers == 0:
                     if os.name == "nt":
-                        handle = msvcrt.get_osfhandle(cls._lock_file.fileno())
+                        handle = msvcrt.get_osfhandle(self._lock_file.fileno())
                         overlapped = Overlapped()
                         if not unlock_file_ex(handle, 0, 1, 0, overlapped):
                             raise RuntimeError("Unlocking the storage failed: "
                                                "%s" % ctypes.FormatError())
                     elif os.name == "posix":
                         try:
-                            fcntl.flock(cls._lock_file.fileno(), fcntl.LOCK_UN)
+                            fcntl.flock(self._lock_file.fileno(),
+                                        fcntl.LOCK_UN)
                         except OSError as e:
                             raise RuntimeError("Unlocking the storage failed: "
                                                "%s" % e) from e
                     else:
                         raise RuntimeError("Unlocking the storage failed: "
                                            "Unsupported operating system")
-                    cls._lock_file_locked = False
-                if cls._waiters:
-                    cls._waiters[0].notify()
-                if (cls.configuration.getboolean(
-                        "storage", "filesystem_close_lock_file") and
-                        cls._readers == 0 and not cls._waiters):
-                    cls._lock_file.close()
-                    cls._lock_file = None
+                    if self._close_lock_file and not self._waiters:
+                        self._lock_file.close()
+                        self._lock_file = None
+                    self._lock_file_locked = False
+                if self._waiters:
+                    self._waiters[0].notify()