diff --git a/doc/source/user/index.rst b/doc/source/user/index.rst index 1867faf..8f0d83e 100644 --- a/doc/source/user/index.rst +++ b/doc/source/user/index.rst @@ -47,6 +47,32 @@ sure that the names of the locks used are carefully chosen (typically by namespacing them to your app so that other apps will not chose the same names). +Enabling fair locking +===================== + +By default there is no requirement that the lock is ``fair``. That is, it's +possible for a thread to block waiting for the lock, then have another thread +block waiting for the lock, and when the lock is released by the current owner +the second waiter could acquire the lock before the first. In an extreme case +you could have a whole string of other threads acquire the lock before the +first waiter acquires it, resulting in unpredictable amounts of latency. + +For cases where this is a problem, it's possible to specify the use of fair +locks:: + + @lockutils.synchronized('not_thread_process_safe', fair=True) + def not_thread_process_safe(): + pass + +When using fair locks the lock itself is slightly more expensive (which +shouldn't matter in most cases), but it will ensure that all threads that +block waiting for the lock will acquire it in the order that they blocked. + +The exception to this is when specifying both ``external`` and ``fair`` +locks. In this case, the ordering *within* a given process will be fair, but +the ordering *between* processes will be determined by the behaviour of the +underlying OS. + Common ways to prefix/namespace the synchronized decorator ========================================================== diff --git a/oslo_concurrency/lockutils.py b/oslo_concurrency/lockutils.py index e896a1a..edec8f9 100644 --- a/oslo_concurrency/lockutils.py +++ b/oslo_concurrency/lockutils.py @@ -87,6 +87,49 @@ ReaderWriterLock = fasteners.ReaderWriterLock """ +class FairLocks(object): + """A garbage collected container of fair locks. + + With a fair lock, contending lockers will get the lock in the order in + which they tried to acquire it. + + This collection internally uses a weak value dictionary so that when a + lock is no longer in use (by any threads) it will automatically be + removed from this container by the garbage collector. + """ + + def __init__(self): + self._locks = weakref.WeakValueDictionary() + self._lock = threading.Lock() + + def get(self, name): + """Gets (or creates) a lock with a given name. + + :param name: The lock name to get/create (used to associate + previously created names with the same lock). + + Returns an newly constructed lock (or an existing one if it was + already created for the given name). + """ + with self._lock: + try: + return self._locks[name] + except KeyError: + # The fasteners module specifies that + # ReaderWriterLock.write_lock() will give FIFO behaviour, + # so we don't need to do anything special ourselves. + rwlock = ReaderWriterLock() + self._locks[name] = rwlock + return rwlock + + +_fair_locks = FairLocks() + + +def internal_fair_lock(name): + return _fair_locks.get(name) + + class Semaphores(object): """A garbage collected container of semaphores. @@ -170,7 +213,7 @@ def internal_lock(name, semaphores=None): @contextlib.contextmanager def lock(name, lock_file_prefix=None, external=False, lock_path=None, - do_log=True, semaphores=None, delay=0.01): + do_log=True, semaphores=None, delay=0.01, fair=False): """Context based lock This function yields a `threading.Semaphore` instance (if we don't use @@ -200,16 +243,26 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, :param delay: Delay between acquisition attempts (in seconds). + :param fair: Whether or not we want a "fair" lock where contending lockers + will get the lock in the order in which they tried to acquire it. + .. versionchanged:: 0.2 Added *do_log* optional parameter. .. versionchanged:: 0.3 Added *delay* and *semaphores* optional parameters. """ - int_lock = internal_lock(name, semaphores=semaphores) + if fair: + if semaphores is not None: + raise NotImplementedError(_('Specifying semaphores is not ' + 'supported when using fair locks.')) + # The fastners module specifies that write_lock() provides fairness. + int_lock = internal_fair_lock(name).write_lock() + else: + int_lock = internal_lock(name, semaphores=semaphores) with int_lock: if do_log: - LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name}) + LOG.debug('Acquired lock "%(lock)s"', {'lock': name}) try: if external and not CONF.oslo_concurrency.disable_process_locking: ext_lock = external_lock(name, lock_file_prefix, lock_path) @@ -225,11 +278,11 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, yield int_lock finally: if do_log: - LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name}) + LOG.debug('Releasing lock "%(lock)s"', {'lock': name}) def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, - semaphores=None, delay=0.01): + semaphores=None, delay=0.01, fair=False): """Synchronization decorator. Decorating a method like so:: @@ -264,7 +317,8 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, t2 = None try: with lock(name, lock_file_prefix, external, lock_path, - do_log=False, semaphores=semaphores, delay=delay): + do_log=False, semaphores=semaphores, delay=delay, + fair=fair): t2 = timeutils.now() LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: ' 'waited %(wait_secs)0.3fs', diff --git a/oslo_concurrency/tests/unit/test_lockutils.py b/oslo_concurrency/tests/unit/test_lockutils.py index 683d89f..a2ce7b7 100644 --- a/oslo_concurrency/tests/unit/test_lockutils.py +++ b/oslo_concurrency/tests/unit/test_lockutils.py @@ -147,6 +147,45 @@ class LockTestCase(test_base.BaseTestCase): self.assertEqual(saved_sem_num, len(lockutils._semaphores), "Semaphore leak detected") + def test_lock_internal_fair(self): + """Check that we're actually fair.""" + + def f(_id): + with lockutils.lock('testlock', 'test-', + external=False, fair=True): + lock_holder.append(_id) + + lock_holder = [] + threads = [] + # While holding the fair lock, spawn a bunch of threads that all try + # to acquire the lock. They will all block. Then release the lock + # and see what happens. + with lockutils.lock('testlock', 'test-', external=False, fair=True): + for i in range(10): + thread = threading.Thread(target=f, args=(i,)) + threads.append(thread) + thread.start() + # Allow some time for the new thread to get queued onto the + # list of pending writers before continuing. This is gross + # but there's no way around it without using knowledge of + # fasteners internals. + time.sleep(0.5) + # Wait for all threads. + for thread in threads: + thread.join() + + self.assertEqual(10, len(lock_holder)) + # Check that the threads each got the lock in fair order. + for i in range(10): + self.assertEqual(i, lock_holder[i]) + + def test_fair_lock_with_semaphore(self): + def do_test(): + s = lockutils.Semaphores() + with lockutils.lock('testlock', 'test-', semaphores=s, fair=True): + pass + self.assertRaises(NotImplementedError, do_test) + def test_nested_synchronized_external_works(self): """We can nest external syncs.""" tempdir = tempfile.mkdtemp() diff --git a/releasenotes/notes/add-option-for-fair-locks-b6d660e97683cec6.yaml b/releasenotes/notes/add-option-for-fair-locks-b6d660e97683cec6.yaml new file mode 100644 index 0000000..ad1cd4b --- /dev/null +++ b/releasenotes/notes/add-option-for-fair-locks-b6d660e97683cec6.yaml @@ -0,0 +1,15 @@ +--- +prelude: > + This release includes optional support for fair locks. When fair locks + are specified, blocking waiters will acquire the lock in the order that + they blocked. +features: + - | + We now have optional support for ``fair`` locks. When fair locks are + specified, blocking waiters will acquire the lock in the order that they + blocked. This can be useful to ensure that existing blocked waiters do + not wait indefinitely in the face of large numbers of new attempts to + acquire the lock. When specifying locks as both ``external`` and ``fair``, + the ordering *within* a given process will be fair, but the ordering + *between* processes will be determined by the behaviour of the underlying + OS.