Merge "Updated cache module and its dependencies"

This commit is contained in:
Jenkins 2014-10-21 10:22:56 +00:00 committed by Gerrit Code Review
commit e808a43f5e
4 changed files with 136 additions and 125 deletions

View File

@ -14,9 +14,10 @@
import collections import collections
from oslo.utils import timeutils
from neutron.openstack.common.cache import backends from neutron.openstack.common.cache import backends
from neutron.openstack.common import lockutils from neutron.openstack.common import lockutils
from neutron.openstack.common import timeutils
class MemoryBackend(backends.BaseCache): class MemoryBackend(backends.BaseCache):
@ -147,7 +148,7 @@ class MemoryBackend(backends.BaseCache):
try: try:
# NOTE(flaper87): Keys with ttl == 0 # NOTE(flaper87): Keys with ttl == 0
# don't exist in the _keys_expires dict # don't exist in the _keys_expires dict
self._keys_expires[value[0]].remove(value[1]) self._keys_expires[value[0]].remove(key)
except (KeyError, ValueError): except (KeyError, ValueError):
pass pass

View File

@ -26,9 +26,9 @@ class BaseCache(object):
:params parsed_url: Parsed url object. :params parsed_url: Parsed url object.
:params options: A dictionary with configuration parameters :params options: A dictionary with configuration parameters
for the cache. For example: for the cache. For example:
- default_ttl: An integer defining the default ttl
for keys. - default_ttl: An integer defining the default ttl for keys.
""" """
def __init__(self, parsed_url, options=None): def __init__(self, parsed_url, options=None):
@ -43,20 +43,17 @@ class BaseCache(object):
def set(self, key, value, ttl, not_exists=False): def set(self, key, value, ttl, not_exists=False):
"""Sets or updates a cache entry """Sets or updates a cache entry
NOTE: Thread-safety is required and has to be .. note:: Thread-safety is required and has to be guaranteed by the
guaranteed by the backend implementation. backend implementation.
:params key: Item key as string. :params key: Item key as string.
:type key: `unicode string` :type key: `unicode string`
:params value: Value to assign to the key. This :params value: Value to assign to the key. This can be anything that
can be anything that is handled is handled by the current backend.
by the current backend. :params ttl: Key's timeout in seconds. 0 means no timeout.
:params ttl: Key's timeout in seconds. 0 means
no timeout.
:type ttl: int :type ttl: int
:params not_exists: If True, the key will be set :params not_exists: If True, the key will be set if it doesn't exist.
if it doesn't exist. Otherwise, Otherwise, it'll always be set.
it'll always be set.
:type not_exists: bool :type not_exists: bool
:returns: True if the operation succeeds, False otherwise. :returns: True if the operation succeeds, False otherwise.
@ -74,9 +71,8 @@ class BaseCache(object):
:params key: Item key as string. :params key: Item key as string.
:type key: `unicode string` :type key: `unicode string`
:params value: Value to assign to the key. This :params value: Value to assign to the key. This can be anything that
can be anything that is handled is handled by the current backend.
by the current backend.
""" """
try: try:
return self[key] return self[key]
@ -91,15 +87,14 @@ class BaseCache(object):
def get(self, key, default=None): def get(self, key, default=None):
"""Gets one item from the cache """Gets one item from the cache
NOTE: Thread-safety is required and it has to be .. note:: Thread-safety is required and it has to be guaranteed
guaranteed by the backend implementation. by the backend implementation.
:params key: Key for the item to retrieve :params key: Key for the item to retrieve from the cache.
from the cache.
:params default: The default value to return. :params default: The default value to return.
:returns: `key`'s value in the cache if it exists, :returns: `key`'s value in the cache if it exists, otherwise
otherwise `default` should be returned. `default` should be returned.
""" """
return self._get(key, default) return self._get(key, default)
@ -115,8 +110,8 @@ class BaseCache(object):
def __delitem__(self, key): def __delitem__(self, key):
"""Removes an item from cache. """Removes an item from cache.
NOTE: Thread-safety is required and it has to be .. note:: Thread-safety is required and it has to be guaranteed by
guaranteed by the backend implementation. the backend implementation.
:params key: The key to remove. :params key: The key to remove.
@ -130,8 +125,8 @@ class BaseCache(object):
def clear(self): def clear(self):
"""Removes all items from the cache. """Removes all items from the cache.
NOTE: Thread-safety is required and it has to be .. note:: Thread-safety is required and it has to be guaranteed by
guaranteed by the backend implementation. the backend implementation.
""" """
return self._clear() return self._clear()
@ -143,9 +138,8 @@ class BaseCache(object):
"""Increments the value for a key """Increments the value for a key
:params key: The key for the value to be incremented :params key: The key for the value to be incremented
:params delta: Number of units by which to increment :params delta: Number of units by which to increment the value.
the value. Pass a negative number to Pass a negative number to decrement the value.
decrement the value.
:returns: The new value :returns: The new value
""" """
@ -158,10 +152,8 @@ class BaseCache(object):
def append_tail(self, key, tail): def append_tail(self, key, tail):
"""Appends `tail` to `key`'s value. """Appends `tail` to `key`'s value.
:params key: The key of the value to which :params key: The key of the value to which `tail` should be appended.
`tail` should be appended. :params tail: The list of values to append to the original.
:params tail: The list of values to append to the
original.
:returns: The new value :returns: The new value
""" """
@ -181,10 +173,8 @@ class BaseCache(object):
def append(self, key, value): def append(self, key, value):
"""Appends `value` to `key`'s value. """Appends `value` to `key`'s value.
:params key: The key of the value to which :params key: The key of the value to which `tail` should be appended.
`tail` should be appended. :params value: The value to append to the original.
:params value: The value to append to the
original.
:returns: The new value :returns: The new value
""" """
@ -196,8 +186,7 @@ class BaseCache(object):
:params key: The key to verify. :params key: The key to verify.
:returns: True if the key exists, :returns: True if the key exists, otherwise False.
otherwise False.
""" """
@abc.abstractmethod @abc.abstractmethod
@ -209,9 +198,8 @@ class BaseCache(object):
"""Gets keys' value from cache """Gets keys' value from cache
:params keys: List of keys to retrieve. :params keys: List of keys to retrieve.
:params default: The default value to return :params default: The default value to return for each key that is not
for each key that is not in in the cache.
the cache.
:returns: A generator of (key, value) :returns: A generator of (key, value)
""" """
@ -227,13 +215,12 @@ class BaseCache(object):
def set_many(self, data, ttl=None): def set_many(self, data, ttl=None):
"""Puts several items into the cache at once """Puts several items into the cache at once
Depending on the backend, this operation may or may Depending on the backend, this operation may or may not be efficient.
not be efficient. The default implementation calls The default implementation calls set for each (key, value) pair
set for each (key, value) pair passed, other backends passed, other backends support set_many operations as part of their
support set_many operations as part of their protocols. protocols.
:params data: A dictionary like {key: val} to store :params data: A dictionary like {key: val} to store in the cache.
in the cache.
:params ttl: Key's timeout in seconds. :params ttl: Key's timeout in seconds.
""" """

View File

@ -24,7 +24,7 @@ from six.moves.urllib import parse
from stevedore import driver from stevedore import driver
def _get_olso_configs(): def _get_oslo_configs():
"""Returns the oslo.config options to register.""" """Returns the oslo.config options to register."""
# NOTE(flaper87): Oslo config should be # NOTE(flaper87): Oslo config should be
# optional. Instead of doing try / except # optional. Instead of doing try / except
@ -45,7 +45,7 @@ def register_oslo_configs(conf):
:params conf: Config object. :params conf: Config object.
:type conf: `cfg.ConfigOptions` :type conf: `cfg.ConfigOptions`
""" """
conf.register_opts(_get_olso_configs()) conf.register_opts(_get_oslo_configs())
def get_cache(url='memory://'): def get_cache(url='memory://'):

View File

@ -13,10 +13,10 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import contextlib import contextlib
import errno import errno
import functools import functools
import logging
import os import os
import shutil import shutil
import subprocess import subprocess
@ -29,9 +29,7 @@ import weakref
from oslo.config import cfg from oslo.config import cfg
from neutron.openstack.common import fileutils from neutron.openstack.common import fileutils
from neutron.openstack.common.gettextutils import _ from neutron.openstack.common._i18n import _, _LE, _LI
from neutron.openstack.common import local
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -39,10 +37,10 @@ LOG = logging.getLogger(__name__)
util_opts = [ util_opts = [
cfg.BoolOpt('disable_process_locking', default=False, cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'), help='Enables or disables inter-process locks.'),
cfg.StrOpt('lock_path', cfg.StrOpt('lock_path',
default=os.environ.get("NEUTRON_LOCK_PATH"), default=os.environ.get("NEUTRON_LOCK_PATH"),
help=('Directory to use for lock files.')) help='Directory to use for lock files.')
] ]
@ -54,7 +52,7 @@ def set_defaults(lock_path):
cfg.set_defaults(util_opts, lock_path=lock_path) cfg.set_defaults(util_opts, lock_path=lock_path)
class _InterProcessLock(object): class _FileLock(object):
"""Lock implementation which allows multiple locks, working around """Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
not require any cleanup. Since the lock is always held on a file not require any cleanup. Since the lock is always held on a file
@ -76,7 +74,13 @@ class _InterProcessLock(object):
self.lockfile = None self.lockfile = None
self.fname = name self.fname = name
def __enter__(self): def acquire(self):
basedir = os.path.dirname(self.fname)
if not os.path.exists(basedir):
fileutils.ensure_tree(basedir)
LOG.info(_LI('Created lock path: %s'), basedir)
self.lockfile = open(self.fname, 'w') self.lockfile = open(self.fname, 'w')
while True: while True:
@ -86,23 +90,39 @@ class _InterProcessLock(object):
# Also upon reading the MSDN docs for locking(), it seems # Also upon reading the MSDN docs for locking(), it seems
# to have a laughable 10 attempts "blocking" mechanism. # to have a laughable 10 attempts "blocking" mechanism.
self.trylock() self.trylock()
return self LOG.debug('Got file lock "%s"', self.fname)
return True
except IOError as e: except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN): if e.errno in (errno.EACCES, errno.EAGAIN):
# external locks synchronise things like iptables # external locks synchronise things like iptables
# updates - give it some time to prevent busy spinning # updates - give it some time to prevent busy spinning
time.sleep(0.01) time.sleep(0.01)
else: else:
raise raise threading.ThreadError(_("Unable to acquire lock on"
" `%(filename)s` due to"
" %(exception)s") %
{'filename': self.fname,
'exception': e})
def __exit__(self, exc_type, exc_val, exc_tb): def __enter__(self):
self.acquire()
return self
def release(self):
try: try:
self.unlock() self.unlock()
self.lockfile.close() self.lockfile.close()
LOG.debug('Released file lock "%s"', self.fname)
except IOError: except IOError:
LOG.exception(_("Could not release the acquired lock `%s`"), LOG.exception(_LE("Could not release the acquired lock `%s`"),
self.fname) self.fname)
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
def exists(self):
return os.path.exists(self.fname)
def trylock(self): def trylock(self):
raise NotImplementedError() raise NotImplementedError()
@ -110,7 +130,7 @@ class _InterProcessLock(object):
raise NotImplementedError() raise NotImplementedError()
class _WindowsLock(_InterProcessLock): class _WindowsLock(_FileLock):
def trylock(self): def trylock(self):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1) msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
@ -118,7 +138,7 @@ class _WindowsLock(_InterProcessLock):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1) msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
class _PosixLock(_InterProcessLock): class _FcntlLock(_FileLock):
def trylock(self): def trylock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
@ -131,12 +151,63 @@ if os.name == 'nt':
InterProcessLock = _WindowsLock InterProcessLock = _WindowsLock
else: else:
import fcntl import fcntl
InterProcessLock = _PosixLock InterProcessLock = _FcntlLock
_semaphores = weakref.WeakValueDictionary() _semaphores = weakref.WeakValueDictionary()
_semaphores_lock = threading.Lock() _semaphores_lock = threading.Lock()
def _get_lock_path(name, lock_file_prefix, lock_path=None):
# NOTE(mikal): the lock name cannot contain directory
# separators
name = name.replace(os.sep, '_')
if lock_file_prefix:
sep = '' if lock_file_prefix.endswith('-') else '-'
name = '%s%s%s' % (lock_file_prefix, sep, name)
local_lock_path = lock_path or CONF.lock_path
if not local_lock_path:
raise cfg.RequiredOptError('lock_path')
return os.path.join(local_lock_path, name)
def external_lock(name, lock_file_prefix=None, lock_path=None):
LOG.debug('Attempting to grab external lock "%(lock)s"',
{'lock': name})
lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path)
return InterProcessLock(lock_file_path)
def remove_external_lock_file(name, lock_file_prefix=None):
"""Remove an external lock file when it's not used anymore
This will be helpful when we have a lot of lock files
"""
with internal_lock(name):
lock_file_path = _get_lock_path(name, lock_file_prefix)
try:
os.remove(lock_file_path)
except OSError:
LOG.info(_LI('Failed to remove file %(file)s'),
{'file': lock_file_path})
def internal_lock(name):
with _semaphores_lock:
try:
sem = _semaphores[name]
LOG.debug('Using existing semaphore "%s"', name)
except KeyError:
sem = threading.Semaphore()
_semaphores[name] = sem
LOG.debug('Created new semaphore "%s"', name)
return sem
@contextlib.contextmanager @contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None): def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock """Context based lock
@ -152,67 +223,19 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None):
should work across multiple processes. This means that if two different should work across multiple processes. This means that if two different
workers both run a method decorated with @synchronized('mylock', workers both run a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time. external=True), only one of them will execute at a time.
:param lock_path: The lock_path keyword argument is used to specify a
special location for external lock files to live. If nothing is set, then
CONF.lock_path is used as a default.
""" """
with _semaphores_lock: int_lock = internal_lock(name)
try: with int_lock:
sem = _semaphores[name] LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name})
except KeyError:
sem = threading.Semaphore()
_semaphores[name] = sem
with sem:
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
try: try:
if external and not CONF.disable_process_locking: if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s"'), ext_lock = external_lock(name, lock_file_prefix, lock_path)
{'lock': name}) with ext_lock:
yield ext_lock
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path or CONF.lock_path
if not local_lock_path:
raise cfg.RequiredOptError('lock_path')
if not os.path.exists(local_lock_path):
fileutils.ensure_tree(local_lock_path)
LOG.info(_('Created lock path: %s'), local_lock_path)
def add_prefix(name, prefix):
if not prefix:
return name
sep = '' if prefix.endswith('-') else '-'
return '%s%s%s' % (prefix, sep, name)
# NOTE(mikal): the lock name cannot contain directory
# separators
lock_file_name = add_prefix(name.replace(os.sep, '_'),
lock_file_prefix)
lock_file_path = os.path.join(local_lock_path, lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock as lock:
LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
yield lock
finally:
LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
else: else:
yield sem yield int_lock
finally: finally:
local.strong_store.locks_held.remove(name) LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name})
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None): def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
@ -244,11 +267,11 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
def inner(*args, **kwargs): def inner(*args, **kwargs):
try: try:
with lock(name, lock_file_prefix, external, lock_path): with lock(name, lock_file_prefix, external, lock_path):
LOG.debug(_('Got semaphore / lock "%(function)s"'), LOG.debug('Got semaphore / lock "%(function)s"',
{'function': f.__name__}) {'function': f.__name__})
return f(*args, **kwargs) return f(*args, **kwargs)
finally: finally:
LOG.debug(_('Semaphore / lock released "%(function)s"'), LOG.debug('Semaphore / lock released "%(function)s"',
{'function': f.__name__}) {'function': f.__name__})
return inner return inner
return wrap return wrap