|
|
@@ -524,7 +524,8 @@ class Collection(BaseCollection):
|
|
|
return "".join([item.serialize() for item in items])
|
|
|
return ""
|
|
|
|
|
|
- _lock = threading.Condition()
|
|
|
+ _lock = threading.Lock()
|
|
|
+ _waiters = []
|
|
|
_lock_file = None
|
|
|
_lock_file_locked = False
|
|
|
_readers = 0
|
|
|
@@ -544,11 +545,20 @@ class Collection(BaseCollection):
|
|
|
# Use a primitive lock which only works within one process as a
|
|
|
# precondition for inter-process file-based locking
|
|
|
with cls._lock:
|
|
|
- cls._lock.wait_for(condition)
|
|
|
+ if cls._waiters or not condition():
|
|
|
+ # use FIFO for access requests
|
|
|
+ waiter = threading.Condition(lock=cls._lock)
|
|
|
+ cls._waiters.append(waiter)
|
|
|
+ while True:
|
|
|
+ waiter.wait()
|
|
|
+ if condition():
|
|
|
+ break
|
|
|
+ cls._waiters.pop(0)
|
|
|
if mode == "r":
|
|
|
cls._readers += 1
|
|
|
# notify additional potential readers
|
|
|
- cls._lock.notify()
|
|
|
+ if cls._waiters:
|
|
|
+ cls._waiters[0].notify()
|
|
|
else:
|
|
|
cls._writer = True
|
|
|
if not cls._lock_file:
|
|
|
@@ -596,4 +606,5 @@ class Collection(BaseCollection):
|
|
|
except OSError:
|
|
|
cls.logger.debug("Unlocking not supported")
|
|
|
cls._lock_file_locked = False
|
|
|
- cls._lock.notify()
|
|
|
+ if cls._waiters:
|
|
|
+ cls._waiters[0].notify()
|