9d3ca49387
Signed-off-by: Dean Troyer <dtroyer@gmail.com>
107 lines
3.9 KiB
Diff
107 lines
3.9 KiB
Diff
diff --git a/oslo_concurrency/lockutils.py b/oslo_concurrency/lockutils.py
|
|
index ea67571..4519463 100644
|
|
--- a/oslo_concurrency/lockutils.py
|
|
+++ b/oslo_concurrency/lockutils.py
|
|
@@ -87,6 +87,43 @@ ReaderWriterLock = fasteners.ReaderWriterLock
|
|
"""
|
|
|
|
|
|
+class FairLocks(object):
|
|
+ """A garbage collected container of fair locks.
|
|
+
|
|
+ This collection internally uses a weak value dictionary so that when a
|
|
+ lock is no longer in use (by any threads) it will automatically be
|
|
+ removed from this container by the garbage collector.
|
|
+ """
|
|
+
|
|
+ def __init__(self):
|
|
+ self._locks = weakref.WeakValueDictionary()
|
|
+ self._lock = threading.Lock()
|
|
+
|
|
+ def get(self, name):
|
|
+ """Gets (or creates) a lock with a given name.
|
|
+
|
|
+ :param name: The lock name to get/create (used to associate
|
|
+ previously created names with the same lock).
|
|
+
|
|
+ Returns an newly constructed lock (or an existing one if it was
|
|
+ already created for the given name).
|
|
+ """
|
|
+ with self._lock:
|
|
+ try:
|
|
+ return self._locks[name]
|
|
+ except KeyError:
|
|
+ rwlock = ReaderWriterLock()
|
|
+ self._locks[name] = rwlock
|
|
+ return rwlock
|
|
+
|
|
+
|
|
+fair_locks = FairLocks()
|
|
+
|
|
+
|
|
+def internal_fair_lock(name):
|
|
+ return fair_locks.get(name)
|
|
+
|
|
+
|
|
class Semaphores(object):
|
|
"""A garbage collected container of semaphores.
|
|
|
|
@@ -170,7 +207,7 @@ def internal_lock(name, semaphores=None):
|
|
|
|
@contextlib.contextmanager
|
|
def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
|
- do_log=True, semaphores=None, delay=0.01):
|
|
+ do_log=True, semaphores=None, delay=0.01, fair=False):
|
|
"""Context based lock
|
|
|
|
This function yields a `threading.Semaphore` instance (if we don't use
|
|
@@ -200,16 +237,22 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
|
|
|
:param delay: Delay between acquisition attempts (in seconds).
|
|
|
|
+ :param fair: Whether or not we want a "fair" lock where contending lockers
|
|
+ will get the lock in the order in which they tried to acquire it.
|
|
+
|
|
.. versionchanged:: 0.2
|
|
Added *do_log* optional parameter.
|
|
|
|
.. versionchanged:: 0.3
|
|
Added *delay* and *semaphores* optional parameters.
|
|
"""
|
|
- int_lock = internal_lock(name, semaphores=semaphores)
|
|
+ if fair:
|
|
+ int_lock = internal_fair_lock(name).write_lock()
|
|
+ else:
|
|
+ int_lock = internal_lock(name, semaphores=semaphores)
|
|
with int_lock:
|
|
if do_log:
|
|
- LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name})
|
|
+ LOG.debug('Acquired lock "%(lock)s"', {'lock': name})
|
|
try:
|
|
if external and not CONF.oslo_concurrency.disable_process_locking:
|
|
ext_lock = external_lock(name, lock_file_prefix, lock_path)
|
|
@@ -222,11 +265,11 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
|
yield int_lock
|
|
finally:
|
|
if do_log:
|
|
- LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name})
|
|
+ LOG.debug('Releasing lock "%(lock)s"', {'lock': name})
|
|
|
|
|
|
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
|
|
- semaphores=None, delay=0.01):
|
|
+ semaphores=None, delay=0.01, fair=False):
|
|
"""Synchronization decorator.
|
|
|
|
Decorating a method like so::
|
|
@@ -261,7 +304,8 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
|
|
t2 = None
|
|
try:
|
|
with lock(name, lock_file_prefix, external, lock_path,
|
|
- do_log=False, semaphores=semaphores, delay=delay):
|
|
+ do_log=False, semaphores=semaphores, delay=delay,
|
|
+ fair=fair):
|
|
t2 = timeutils.now()
|
|
LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: '
|
|
'waited %(wait_secs)0.3fs',
|