Merge "Switch to oslo_cache"
This commit is contained in:
commit
dc11333b56
@ -1,10 +0,0 @@
|
||||
[DEFAULT]
|
||||
module=cache
|
||||
module=excutils
|
||||
module=fileutils
|
||||
module=gettextutils
|
||||
module=lockutils
|
||||
module=timeutils
|
||||
|
||||
# Base module
|
||||
base=zaqar
|
@ -10,11 +10,11 @@ jsonschema!=2.5.0,<3.0.0,>=2.0.0
|
||||
iso8601>=0.1.9
|
||||
keystonemiddleware>=2.0.0
|
||||
msgpack-python>=0.4.0
|
||||
posix-ipc
|
||||
python-memcached>=1.56
|
||||
WebOb>=1.2.3
|
||||
stevedore>=1.5.0 # Apache-2.0
|
||||
six>=1.9.0
|
||||
oslo.cache>=0.4.0 # Apache-2.0
|
||||
oslo.config>=2.3.0 # Apache-2.0
|
||||
oslo.context>=0.2.0 # Apache-2.0
|
||||
oslo.i18n>=1.5.0 # Apache-2.0
|
||||
|
@ -49,9 +49,6 @@ zaqar.transport =
|
||||
wsgi = zaqar.transport.wsgi.driver:Driver
|
||||
websocket = zaqar.transport.websocket.driver:Driver
|
||||
|
||||
zaqar.openstack.common.cache.backends =
|
||||
memory = zaqar.openstack.common.cache._backends.memory:MemoryBackend
|
||||
|
||||
oslo.config.opts =
|
||||
zaqar.common.configs = zaqar.common.configs:_config_options
|
||||
zaqar.storage.pipeline = zaqar.storage.pipeline:_config_options
|
||||
|
@ -18,10 +18,10 @@ from oslo_log import log
|
||||
from stevedore import driver
|
||||
|
||||
from zaqar.api import handler
|
||||
from zaqar.common import cache as oslo_cache
|
||||
from zaqar.common import configs
|
||||
from zaqar.common import decorators
|
||||
from zaqar.common import errors
|
||||
from zaqar.openstack.common.cache import cache as oslo_cache
|
||||
from zaqar.storage import pipeline
|
||||
from zaqar.storage import pooling
|
||||
from zaqar.storage import utils as storage_utils
|
||||
@ -93,9 +93,8 @@ class Bootstrap(object):
|
||||
def cache(self):
|
||||
LOG.debug(u'Loading proxy cache driver')
|
||||
try:
|
||||
oslo_cache.register_oslo_configs(self.conf)
|
||||
mgr = oslo_cache.get_cache(self.conf.cache_url)
|
||||
return mgr
|
||||
oslo_cache.register_config(self.conf)
|
||||
return oslo_cache.get_cache(self.conf)
|
||||
except RuntimeError as exc:
|
||||
LOG.exception(exc)
|
||||
raise errors.InvalidDriver(exc)
|
||||
|
@ -1,3 +1,4 @@
|
||||
# Copyright 2015 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -11,7 +12,13 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
from oslo_cache import core
|
||||
|
||||
|
||||
six.add_move(six.MovedModule('mox', 'mox', 'mox3.mox'))
|
||||
def register_config(conf):
|
||||
core.configure(conf)
|
||||
|
||||
|
||||
def get_cache(conf):
|
||||
region = core.create_region()
|
||||
return core.configure_cache_region(conf, region)
|
@ -16,6 +16,7 @@
|
||||
import functools
|
||||
|
||||
import msgpack
|
||||
from oslo_cache import core
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
@ -115,7 +116,7 @@ def caches(keygen, ttl, cond=None):
|
||||
def wrapper(self, *args, **kwargs):
|
||||
# First, purge from cache
|
||||
key = keygen(*args, **kwargs)
|
||||
del self._cache[key]
|
||||
self._cache.delete(key)
|
||||
|
||||
# Remove/delete from origin
|
||||
remover(self, *args, **kwargs)
|
||||
@ -127,9 +128,9 @@ def caches(keygen, ttl, cond=None):
|
||||
@functools.wraps(getter)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
key = keygen(*args, **kwargs)
|
||||
packed_value = self._cache.get(key)
|
||||
packed_value = self._cache.get(key, expiration_time=ttl)
|
||||
|
||||
if packed_value is None:
|
||||
if packed_value is core.NO_VALUE:
|
||||
value = getter(self, *args, **kwargs)
|
||||
|
||||
# Cache new value if desired
|
||||
@ -141,8 +142,7 @@ def caches(keygen, ttl, cond=None):
|
||||
# str format family.
|
||||
packed_value = msgpack.packb(value, use_bin_type=True)
|
||||
|
||||
if not self._cache.set(key, packed_value, ttl):
|
||||
LOG.warn('Failed to cache key: ' + key)
|
||||
self._cache.set(key, packed_value)
|
||||
else:
|
||||
# NOTE(kgriffs): unpackb does not default to UTF-8,
|
||||
# so we have to explicitly ask for it.
|
||||
|
165
zaqar/openstack/common/cache/_backends/memory.py
vendored
165
zaqar/openstack/common/cache/_backends/memory.py
vendored
@ -1,165 +0,0 @@
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
|
||||
from zaqar.openstack.common.cache import backends
|
||||
from zaqar.openstack.common import lockutils
|
||||
from zaqar.openstack.common import timeutils
|
||||
|
||||
|
||||
class MemoryBackend(backends.BaseCache):
|
||||
|
||||
def __init__(self, parsed_url, options=None):
|
||||
super(MemoryBackend, self).__init__(parsed_url, options)
|
||||
self._clear()
|
||||
|
||||
def _set_unlocked(self, key, value, ttl=0):
|
||||
expires_at = 0
|
||||
if ttl != 0:
|
||||
expires_at = timeutils.utcnow_ts() + ttl
|
||||
|
||||
self._cache[key] = (expires_at, value)
|
||||
|
||||
if expires_at:
|
||||
self._keys_expires[expires_at].add(key)
|
||||
|
||||
def _set(self, key, value, ttl=0, not_exists=False):
|
||||
with lockutils.lock(key):
|
||||
|
||||
# NOTE(flaper87): This is needed just in `set`
|
||||
# calls, hence it's not in `_set_unlocked`
|
||||
if not_exists and self._exists_unlocked(key):
|
||||
return False
|
||||
|
||||
self._set_unlocked(key, value, ttl)
|
||||
return True
|
||||
|
||||
def _get_unlocked(self, key, default=None):
|
||||
now = timeutils.utcnow_ts()
|
||||
|
||||
try:
|
||||
timeout, value = self._cache[key]
|
||||
except KeyError:
|
||||
return 0, default
|
||||
|
||||
if timeout and now >= timeout:
|
||||
|
||||
# NOTE(flaper87): Record expired,
|
||||
# remove it from the cache but catch
|
||||
# KeyError and ValueError in case
|
||||
# _purge_expired removed this key already.
|
||||
try:
|
||||
del self._cache[key]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
try:
|
||||
# NOTE(flaper87): Keys with ttl == 0
|
||||
# don't exist in the _keys_expires dict
|
||||
self._keys_expires[timeout].remove(key)
|
||||
except (KeyError, ValueError):
|
||||
pass
|
||||
|
||||
return 0, default
|
||||
|
||||
return timeout, value
|
||||
|
||||
def _get(self, key, default=None):
|
||||
with lockutils.lock(key):
|
||||
return self._get_unlocked(key, default)[1]
|
||||
|
||||
def _exists_unlocked(self, key):
|
||||
now = timeutils.utcnow_ts()
|
||||
try:
|
||||
timeout = self._cache[key][0]
|
||||
return not timeout or now <= timeout
|
||||
except KeyError:
|
||||
return False
|
||||
|
||||
def __contains__(self, key):
|
||||
with lockutils.lock(key):
|
||||
return self._exists_unlocked(key)
|
||||
|
||||
def _incr_append(self, key, other):
|
||||
with lockutils.lock(key):
|
||||
timeout, value = self._get_unlocked(key)
|
||||
|
||||
if value is None:
|
||||
return None
|
||||
|
||||
ttl = timeutils.utcnow_ts() - timeout
|
||||
new_value = value + other
|
||||
self._set_unlocked(key, new_value, ttl)
|
||||
return new_value
|
||||
|
||||
def _incr(self, key, delta):
|
||||
if not isinstance(delta, int):
|
||||
raise TypeError('delta must be an int instance')
|
||||
|
||||
return self._incr_append(key, delta)
|
||||
|
||||
def _append_tail(self, key, tail):
|
||||
return self._incr_append(key, tail)
|
||||
|
||||
def _purge_expired(self):
|
||||
"""Removes expired keys from the cache."""
|
||||
|
||||
now = timeutils.utcnow_ts()
|
||||
for timeout in sorted(self._keys_expires.keys()):
|
||||
|
||||
# NOTE(flaper87): If timeout is greater
|
||||
# than `now`, stop the iteration, remaining
|
||||
# keys have not expired.
|
||||
if now < timeout:
|
||||
break
|
||||
|
||||
# NOTE(flaper87): Unset every key in
|
||||
# this set from the cache if its timeout
|
||||
# is equal to `timeout`. (The key might
|
||||
# have been updated)
|
||||
for subkey in self._keys_expires.pop(timeout):
|
||||
try:
|
||||
if self._cache[subkey][0] == timeout:
|
||||
del self._cache[subkey]
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
def __delitem__(self, key):
|
||||
self._purge_expired()
|
||||
|
||||
# NOTE(flaper87): Delete the key. Using pop
|
||||
# since it could have been deleted already
|
||||
value = self._cache.pop(key, None)
|
||||
|
||||
if value:
|
||||
try:
|
||||
# NOTE(flaper87): Keys with ttl == 0
|
||||
# don't exist in the _keys_expires dict
|
||||
self._keys_expires[value[0]].remove(key)
|
||||
except (KeyError, ValueError):
|
||||
pass
|
||||
|
||||
def _clear(self):
|
||||
self._cache = {}
|
||||
self._keys_expires = collections.defaultdict(set)
|
||||
|
||||
def _get_many(self, keys, default):
|
||||
return super(MemoryBackend, self)._get_many(keys, default)
|
||||
|
||||
def _set_many(self, data, ttl=0):
|
||||
return super(MemoryBackend, self)._set_many(data, ttl)
|
||||
|
||||
def _unset_many(self, keys):
|
||||
return super(MemoryBackend, self)._unset_many(keys)
|
250
zaqar/openstack/common/cache/backends.py
vendored
250
zaqar/openstack/common/cache/backends.py
vendored
@ -1,250 +0,0 @@
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
|
||||
import six
|
||||
|
||||
|
||||
NOTSET = object()
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class BaseCache(object):
|
||||
"""Base Cache Abstraction
|
||||
|
||||
:params parsed_url: Parsed url object.
|
||||
:params options: A dictionary with configuration parameters
|
||||
for the cache. For example:
|
||||
|
||||
- default_ttl: An integer defining the default ttl for keys.
|
||||
"""
|
||||
|
||||
def __init__(self, parsed_url, options=None):
|
||||
self._parsed_url = parsed_url
|
||||
self._options = options or {}
|
||||
self._default_ttl = int(self._options.get('default_ttl', 0))
|
||||
|
||||
@abc.abstractmethod
|
||||
def _set(self, key, value, ttl, not_exists=False):
|
||||
"""Implementations of this class have to override this method."""
|
||||
|
||||
def set(self, key, value, ttl, not_exists=False):
|
||||
"""Sets or updates a cache entry
|
||||
|
||||
.. note:: Thread-safety is required and has to be guaranteed by the
|
||||
backend implementation.
|
||||
|
||||
:params key: Item key as string.
|
||||
:type key: `unicode string`
|
||||
:params value: Value to assign to the key. This can be anything that
|
||||
is handled by the current backend.
|
||||
:params ttl: Key's timeout in seconds. 0 means no timeout.
|
||||
:type ttl: int
|
||||
:params not_exists: If True, the key will be set if it doesn't exist.
|
||||
Otherwise, it'll always be set.
|
||||
:type not_exists: bool
|
||||
|
||||
:returns: True if the operation succeeds, False otherwise.
|
||||
"""
|
||||
if ttl is None:
|
||||
ttl = self._default_ttl
|
||||
|
||||
return self._set(key, value, ttl, not_exists)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.set(key, value, self._default_ttl)
|
||||
|
||||
def setdefault(self, key, value):
|
||||
"""Sets the key value to `value` if it doesn't exist
|
||||
|
||||
:params key: Item key as string.
|
||||
:type key: `unicode string`
|
||||
:params value: Value to assign to the key. This can be anything that
|
||||
is handled by the current backend.
|
||||
"""
|
||||
try:
|
||||
return self[key]
|
||||
except KeyError:
|
||||
self[key] = value
|
||||
return value
|
||||
|
||||
@abc.abstractmethod
|
||||
def _get(self, key, default):
|
||||
"""Implementations of this class have to override this method."""
|
||||
|
||||
def get(self, key, default=None):
|
||||
"""Gets one item from the cache
|
||||
|
||||
.. note:: Thread-safety is required and it has to be guaranteed
|
||||
by the backend implementation.
|
||||
|
||||
:params key: Key for the item to retrieve from the cache.
|
||||
:params default: The default value to return.
|
||||
|
||||
:returns: `key`'s value in the cache if it exists, otherwise
|
||||
`default` should be returned.
|
||||
"""
|
||||
return self._get(key, default)
|
||||
|
||||
def __getitem__(self, key):
|
||||
value = self.get(key, NOTSET)
|
||||
|
||||
if value is NOTSET:
|
||||
raise KeyError
|
||||
|
||||
return value
|
||||
|
||||
@abc.abstractmethod
|
||||
def __delitem__(self, key):
|
||||
"""Removes an item from cache.
|
||||
|
||||
.. note:: Thread-safety is required and it has to be guaranteed by
|
||||
the backend implementation.
|
||||
|
||||
:params key: The key to remove.
|
||||
|
||||
:returns: The key value if there's one
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def _clear(self):
|
||||
"""Implementations of this class have to override this method."""
|
||||
|
||||
def clear(self):
|
||||
"""Removes all items from the cache.
|
||||
|
||||
.. note:: Thread-safety is required and it has to be guaranteed by
|
||||
the backend implementation.
|
||||
"""
|
||||
return self._clear()
|
||||
|
||||
@abc.abstractmethod
|
||||
def _incr(self, key, delta):
|
||||
"""Implementations of this class have to override this method."""
|
||||
|
||||
def incr(self, key, delta=1):
|
||||
"""Increments the value for a key
|
||||
|
||||
:params key: The key for the value to be incremented
|
||||
:params delta: Number of units by which to increment the value.
|
||||
Pass a negative number to decrement the value.
|
||||
|
||||
:returns: The new value
|
||||
"""
|
||||
return self._incr(key, delta)
|
||||
|
||||
@abc.abstractmethod
|
||||
def _append_tail(self, key, tail):
|
||||
"""Implementations of this class have to override this method."""
|
||||
|
||||
def append_tail(self, key, tail):
|
||||
"""Appends `tail` to `key`'s value.
|
||||
|
||||
:params key: The key of the value to which `tail` should be appended.
|
||||
:params tail: The list of values to append to the original.
|
||||
|
||||
:returns: The new value
|
||||
"""
|
||||
|
||||
if not hasattr(tail, "__iter__"):
|
||||
raise TypeError('Tail must be an iterable')
|
||||
|
||||
if not isinstance(tail, list):
|
||||
# NOTE(flaper87): Make sure we pass a list
|
||||
# down to the implementation. Not all drivers
|
||||
# have support for generators, sets or other
|
||||
# iterables.
|
||||
tail = list(tail)
|
||||
|
||||
return self._append_tail(key, tail)
|
||||
|
||||
def append(self, key, value):
|
||||
"""Appends `value` to `key`'s value.
|
||||
|
||||
:params key: The key of the value to which `tail` should be appended.
|
||||
:params value: The value to append to the original.
|
||||
|
||||
:returns: The new value
|
||||
"""
|
||||
return self.append_tail(key, [value])
|
||||
|
||||
@abc.abstractmethod
|
||||
def __contains__(self, key):
|
||||
"""Verifies that a key exists.
|
||||
|
||||
:params key: The key to verify.
|
||||
|
||||
:returns: True if the key exists, otherwise False.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def _get_many(self, keys, default):
|
||||
"""Implementations of this class have to override this method."""
|
||||
return ((k, self.get(k, default=default)) for k in keys)
|
||||
|
||||
def get_many(self, keys, default=NOTSET):
|
||||
"""Gets keys' value from cache
|
||||
|
||||
:params keys: List of keys to retrieve.
|
||||
:params default: The default value to return for each key that is not
|
||||
in the cache.
|
||||
|
||||
:returns: A generator of (key, value)
|
||||
"""
|
||||
return self._get_many(keys, default)
|
||||
|
||||
@abc.abstractmethod
|
||||
def _set_many(self, data, ttl):
|
||||
"""Implementations of this class have to override this method."""
|
||||
|
||||
for key, value in data.items():
|
||||
self.set(key, value, ttl=ttl)
|
||||
|
||||
def set_many(self, data, ttl=None):
|
||||
"""Puts several items into the cache at once
|
||||
|
||||
Depending on the backend, this operation may or may not be efficient.
|
||||
The default implementation calls set for each (key, value) pair
|
||||
passed, other backends support set_many operations as part of their
|
||||
protocols.
|
||||
|
||||
:params data: A dictionary like {key: val} to store in the cache.
|
||||
:params ttl: Key's timeout in seconds.
|
||||
"""
|
||||
|
||||
if ttl is None:
|
||||
ttl = self._default_ttl
|
||||
|
||||
self._set_many(data, ttl)
|
||||
|
||||
def update(self, **kwargs):
|
||||
"""Sets several (key, value) paris.
|
||||
|
||||
Refer to the `set_many` docstring.
|
||||
"""
|
||||
self.set_many(kwargs, ttl=self._default_ttl)
|
||||
|
||||
@abc.abstractmethod
|
||||
def _unset_many(self, keys):
|
||||
"""Implementations of this class have to override this method."""
|
||||
for key in keys:
|
||||
del self[key]
|
||||
|
||||
def unset_many(self, keys):
|
||||
"""Removes several keys from the cache at once
|
||||
|
||||
:params keys: List of keys to unset.
|
||||
"""
|
||||
self._unset_many(keys)
|
78
zaqar/openstack/common/cache/cache.py
vendored
78
zaqar/openstack/common/cache/cache.py
vendored
@ -1,78 +0,0 @@
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Cache library.
|
||||
|
||||
Supported configuration options:
|
||||
|
||||
`default_backend`: Name of the cache backend to use.
|
||||
`key_namespace`: Namespace under which keys will be created.
|
||||
"""
|
||||
|
||||
from six.moves.urllib import parse
|
||||
from stevedore import driver
|
||||
|
||||
|
||||
def _get_olso_configs():
|
||||
"""Returns the oslo_config options to register."""
|
||||
# NOTE(flaper87): Oslo config should be
|
||||
# optional. Instead of doing try / except
|
||||
# at the top of this file, lets import cfg
|
||||
# here and assume that the caller of this
|
||||
# function already took care of this dependency.
|
||||
from oslo_config import cfg
|
||||
|
||||
return [
|
||||
cfg.StrOpt('cache_url', default='memory://',
|
||||
help='URL to connect to the cache back end.')
|
||||
]
|
||||
|
||||
|
||||
def register_oslo_configs(conf):
|
||||
"""Registers a cache configuration options
|
||||
|
||||
:params conf: Config object.
|
||||
:type conf: `cfg.ConfigOptions`
|
||||
"""
|
||||
conf.register_opts(_get_olso_configs())
|
||||
|
||||
|
||||
def get_cache(url='memory://'):
|
||||
"""Loads the cache backend
|
||||
|
||||
This function loads the cache backend
|
||||
specified in the given configuration.
|
||||
|
||||
:param conf: Configuration instance to use
|
||||
"""
|
||||
|
||||
parsed = parse.urlparse(url)
|
||||
backend = parsed.scheme
|
||||
|
||||
query = parsed.query
|
||||
# NOTE(flaper87): We need the following hack
|
||||
# for python versions < 2.7.5. Previous versions
|
||||
# of python parsed query params just for 'known'
|
||||
# schemes. This was changed in this patch:
|
||||
# http://hg.python.org/cpython/rev/79e6ff3d9afd
|
||||
if not query and '?' in parsed.path:
|
||||
query = parsed.path.split('?', 1)[-1]
|
||||
parameters = parse.parse_qsl(query)
|
||||
kwargs = {'options': dict(parameters)}
|
||||
|
||||
mgr = driver.DriverManager('zaqar.openstack.common.cache.backends', backend,
|
||||
invoke_on_load=True,
|
||||
invoke_args=[parsed],
|
||||
invoke_kwds=kwargs)
|
||||
return mgr.driver
|
@ -1,113 +0,0 @@
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# Copyright 2012, Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Exception related utilities.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
|
||||
import six
|
||||
|
||||
from zaqar.openstack.common.gettextutils import _LE
|
||||
|
||||
|
||||
class save_and_reraise_exception(object):
|
||||
"""Save current exception, run some code and then re-raise.
|
||||
|
||||
In some cases the exception context can be cleared, resulting in None
|
||||
being attempted to be re-raised after an exception handler is run. This
|
||||
can happen when eventlet switches greenthreads or when running an
|
||||
exception handler, code raises and catches an exception. In both
|
||||
cases the exception context will be cleared.
|
||||
|
||||
To work around this, we save the exception state, run handler code, and
|
||||
then re-raise the original exception. If another exception occurs, the
|
||||
saved exception is logged and the new exception is re-raised.
|
||||
|
||||
In some cases the caller may not want to re-raise the exception, and
|
||||
for those circumstances this context provides a reraise flag that
|
||||
can be used to suppress the exception. For example::
|
||||
|
||||
except Exception:
|
||||
with save_and_reraise_exception() as ctxt:
|
||||
decide_if_need_reraise()
|
||||
if not should_be_reraised:
|
||||
ctxt.reraise = False
|
||||
|
||||
If another exception occurs and reraise flag is False,
|
||||
the saved exception will not be logged.
|
||||
|
||||
If the caller wants to raise new exception during exception handling
|
||||
he/she sets reraise to False initially with an ability to set it back to
|
||||
True if needed::
|
||||
|
||||
except Exception:
|
||||
with save_and_reraise_exception(reraise=False) as ctxt:
|
||||
[if statements to determine whether to raise a new exception]
|
||||
# Not raising a new exception, so reraise
|
||||
ctxt.reraise = True
|
||||
"""
|
||||
def __init__(self, reraise=True):
|
||||
self.reraise = reraise
|
||||
|
||||
def __enter__(self):
|
||||
self.type_, self.value, self.tb, = sys.exc_info()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if exc_type is not None:
|
||||
if self.reraise:
|
||||
logging.error(_LE('Original exception being dropped: %s'),
|
||||
traceback.format_exception(self.type_,
|
||||
self.value,
|
||||
self.tb))
|
||||
return False
|
||||
if self.reraise:
|
||||
six.reraise(self.type_, self.value, self.tb)
|
||||
|
||||
|
||||
def forever_retry_uncaught_exceptions(infunc):
|
||||
def inner_func(*args, **kwargs):
|
||||
last_log_time = 0
|
||||
last_exc_message = None
|
||||
exc_count = 0
|
||||
while True:
|
||||
try:
|
||||
return infunc(*args, **kwargs)
|
||||
except Exception as exc:
|
||||
this_exc_message = six.u(str(exc))
|
||||
if this_exc_message == last_exc_message:
|
||||
exc_count += 1
|
||||
else:
|
||||
exc_count = 1
|
||||
# Do not log any more frequently than once a minute unless
|
||||
# the exception message changes
|
||||
cur_time = int(time.time())
|
||||
if (cur_time - last_log_time > 60 or
|
||||
this_exc_message != last_exc_message):
|
||||
logging.exception(
|
||||
_LE('Unexpected exception occurred %d time(s)... '
|
||||
'retrying.') % exc_count)
|
||||
last_log_time = cur_time
|
||||
last_exc_message = this_exc_message
|
||||
exc_count = 0
|
||||
# This should be a very rare event. In case it isn't, do
|
||||
# a sleep.
|
||||
time.sleep(1)
|
||||
return inner_func
|
@ -1,146 +0,0 @@
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import errno
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
from zaqar.openstack.common import excutils
|
||||
from oslo_log import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
_FILE_CACHE = {}
|
||||
|
||||
|
||||
def ensure_tree(path):
|
||||
"""Create a directory (and any ancestor directories required)
|
||||
|
||||
:param path: Directory to create
|
||||
"""
|
||||
try:
|
||||
os.makedirs(path)
|
||||
except OSError as exc:
|
||||
if exc.errno == errno.EEXIST:
|
||||
if not os.path.isdir(path):
|
||||
raise
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
def read_cached_file(filename, force_reload=False):
|
||||
"""Read from a file if it has been modified.
|
||||
|
||||
:param force_reload: Whether to reload the file.
|
||||
:returns: A tuple with a boolean specifying if the data is fresh
|
||||
or not.
|
||||
"""
|
||||
global _FILE_CACHE
|
||||
|
||||
if force_reload:
|
||||
delete_cached_file(filename)
|
||||
|
||||
reloaded = False
|
||||
mtime = os.path.getmtime(filename)
|
||||
cache_info = _FILE_CACHE.setdefault(filename, {})
|
||||
|
||||
if not cache_info or mtime > cache_info.get('mtime', 0):
|
||||
LOG.debug("Reloading cached file %s" % filename)
|
||||
with open(filename) as fap:
|
||||
cache_info['data'] = fap.read()
|
||||
cache_info['mtime'] = mtime
|
||||
reloaded = True
|
||||
return reloaded, cache_info['data']
|
||||
|
||||
|
||||
def delete_cached_file(filename):
|
||||
"""Delete cached file if present.
|
||||
|
||||
:param filename: filename to delete
|
||||
"""
|
||||
global _FILE_CACHE
|
||||
|
||||
if filename in _FILE_CACHE:
|
||||
del _FILE_CACHE[filename]
|
||||
|
||||
|
||||
def delete_if_exists(path, remove=os.unlink):
|
||||
"""Delete a file, but ignore file not found error.
|
||||
|
||||
:param path: File to delete
|
||||
:param remove: Optional function to remove passed path
|
||||
"""
|
||||
|
||||
try:
|
||||
remove(path)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOENT:
|
||||
raise
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def remove_path_on_error(path, remove=delete_if_exists):
|
||||
"""Protect code that wants to operate on PATH atomically.
|
||||
Any exception will cause PATH to be removed.
|
||||
|
||||
:param path: File to work with
|
||||
:param remove: Optional function to remove passed path
|
||||
"""
|
||||
|
||||
try:
|
||||
yield
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
remove(path)
|
||||
|
||||
|
||||
def file_open(*args, **kwargs):
|
||||
"""Open file
|
||||
|
||||
see built-in open() documentation for more details
|
||||
|
||||
Note: The reason this is kept in a separate module is to easily
|
||||
be able to provide a stub module that doesn't alter system
|
||||
state at all (for unit tests)
|
||||
"""
|
||||
return open(*args, **kwargs)
|
||||
|
||||
|
||||
def write_to_tempfile(content, path=None, suffix='', prefix='tmp'):
|
||||
"""Create temporary file or use existing file.
|
||||
|
||||
This util is needed for creating temporary file with
|
||||
specified content, suffix and prefix. If path is not None,
|
||||
it will be used for writing content. If the path doesn't
|
||||
exist it'll be created.
|
||||
|
||||
:param content: content for temporary file.
|
||||
:param path: same as parameter 'dir' for mkstemp
|
||||
:param suffix: same as parameter 'suffix' for mkstemp
|
||||
:param prefix: same as parameter 'prefix' for mkstemp
|
||||
|
||||
For example: it can be used in database tests for creating
|
||||
configuration files.
|
||||
"""
|
||||
if path:
|
||||
ensure_tree(path)
|
||||
|
||||
(fd, path) = tempfile.mkstemp(suffix=suffix, dir=path, prefix=prefix)
|
||||
try:
|
||||
os.write(fd, content)
|
||||
finally:
|
||||
os.close(fd)
|
||||
return path
|
@ -1,479 +0,0 @@
|
||||
# Copyright 2012 Red Hat, Inc.
|
||||
# Copyright 2013 IBM Corp.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
gettext for openstack-common modules.
|
||||
|
||||
Usual usage in an openstack.common module:
|
||||
|
||||
from zaqar.openstack.common.gettextutils import _
|
||||
"""
|
||||
|
||||
import copy
|
||||
import gettext
|
||||
import locale
|
||||
from logging import handlers
|
||||
import os
|
||||
|
||||
from babel import localedata
|
||||
import six
|
||||
|
||||
_AVAILABLE_LANGUAGES = {}
|
||||
|
||||
# FIXME(dhellmann): Remove this when moving to oslo_i18n.
|
||||
USE_LAZY = False
|
||||
|
||||
|
||||
class TranslatorFactory(object):
|
||||
"""Create translator functions
|
||||
"""
|
||||
|
||||
def __init__(self, domain, localedir=None):
|
||||
"""Establish a set of translation functions for the domain.
|
||||
|
||||
:param domain: Name of translation domain,
|
||||
specifying a message catalog.
|
||||
:type domain: str
|
||||
:param lazy: Delays translation until a message is emitted.
|
||||
Defaults to False.
|
||||
:type lazy: Boolean
|
||||
:param localedir: Directory with translation catalogs.
|
||||
:type localedir: str
|
||||
"""
|
||||
self.domain = domain
|
||||
if localedir is None:
|
||||
localedir = os.environ.get(domain.upper() + '_LOCALEDIR')
|
||||
self.localedir = localedir
|
||||
|
||||
def _make_translation_func(self, domain=None):
|
||||
"""Return a new translation function ready for use.
|
||||
|
||||
Takes into account whether or not lazy translation is being
|
||||
done.
|
||||
|
||||
The domain can be specified to override the default from the
|
||||
factory, but the localedir from the factory is always used
|
||||
because we assume the log-level translation catalogs are
|
||||
installed in the same directory as the main application
|
||||
catalog.
|
||||
|
||||
"""
|
||||
if domain is None:
|
||||
domain = self.domain
|
||||
t = gettext.translation(domain,
|
||||
localedir=self.localedir,
|
||||
fallback=True)
|
||||
# Use the appropriate method of the translation object based
|
||||
# on the python version.
|
||||
m = t.gettext if six.PY3 else t.ugettext
|
||||
|
||||
def f(msg):
|
||||
"""oslo_i18n.gettextutils translation function."""
|
||||
if USE_LAZY:
|
||||
return Message(msg, domain=domain)
|
||||
return m(msg)
|
||||
return f
|
||||
|
||||
@property
|
||||
def primary(self):
|
||||
"The default translation function."
|
||||
return self._make_translation_func()
|
||||
|
||||
def _make_log_translation_func(self, level):
|
||||
return self._make_translation_func(self.domain + '-log-' + level)
|
||||
|
||||
@property
|
||||
def log_info(self):
|
||||
"Translate info-level log messages."
|
||||
return self._make_log_translation_func('info')
|
||||
|
||||
@property
|
||||
def log_warning(self):
|
||||
"Translate warning-level log messages."
|
||||
return self._make_log_translation_func('warning')
|
||||
|
||||
@property
|
||||
def log_error(self):
|
||||
"Translate error-level log messages."
|
||||
return self._make_log_translation_func('error')
|
||||
|
||||
@property
|
||||
def log_critical(self):
|
||||
"Translate critical-level log messages."
|
||||
return self._make_log_translation_func('critical')
|
||||
|
||||
|
||||
# NOTE(dhellmann): When this module moves out of the incubator into
|
||||
# oslo_i18n, these global variables can be moved to an integration
|
||||
# module within each application.
|
||||
|
||||
# Create the global translation functions.
|
||||
_translators = TranslatorFactory('zaqar')
|
||||
|
||||
# The primary translation function using the well-known name "_"
|
||||
_ = _translators.primary
|
||||
|
||||
# Translators for log levels.
|
||||
#
|
||||
# The abbreviated names are meant to reflect the usual use of a short
|
||||
# name like '_'. The "L" is for "log" and the other letter comes from
|
||||
# the level.
|
||||
_LI = _translators.log_info
|
||||
_LW = _translators.log_warning
|
||||
_LE = _translators.log_error
|
||||
_LC = _translators.log_critical
|
||||
|
||||
# NOTE(dhellmann): End of globals that will move to the application's
|
||||
# integration module.
|
||||
|
||||
|
||||
def enable_lazy():
|
||||
"""Convenience function for configuring _() to use lazy gettext
|
||||
|
||||
Call this at the start of execution to enable the gettextutils._
|
||||
function to use lazy gettext functionality. This is useful if
|
||||
your project is importing _ directly instead of using the
|
||||
gettextutils.install() way of importing the _ function.
|
||||
"""
|
||||
global USE_LAZY
|
||||
USE_LAZY = True
|
||||
|
||||
|
||||
def install(domain):
|
||||
"""Install a _() function using the given translation domain.
|
||||
|
||||
Given a translation domain, install a _() function using gettext's
|
||||
install() function.
|
||||
|
||||
The main difference from gettext.install() is that we allow
|
||||
overriding the default localedir (e.g. /usr/share/locale) using
|
||||
a translation-domain-specific environment variable (e.g.
|
||||
NOVA_LOCALEDIR).
|
||||
|
||||
Note that to enable lazy translation, enable_lazy must be
|
||||
called.
|
||||
|
||||
:param domain: the translation domain
|
||||
"""
|
||||
from six import moves
|
||||
tf = TranslatorFactory(domain)
|
||||
moves.builtins.__dict__['_'] = tf.primary
|
||||
|
||||
|
||||
class Message(six.text_type):
|
||||
"""A Message object is a unicode object that can be translated.
|
||||
|
||||
Translation of Message is done explicitly using the translate() method.
|
||||
For all non-translation intents and purposes, a Message is simply unicode,
|
||||
and can be treated as such.
|
||||
"""
|
||||
|
||||
def __new__(cls, msgid, msgtext=None, params=None,
|
||||
domain='zaqar', *args):
|
||||
"""Create a new Message object.
|
||||
|
||||
In order for translation to work gettext requires a message ID, this
|
||||
msgid will be used as the base unicode text. It is also possible
|
||||
for the msgid and the base unicode text to be different by passing
|
||||
the msgtext parameter.
|
||||
"""
|
||||
# If the base msgtext is not given, we use the default translation
|
||||
# of the msgid (which is in English) just in case the system locale is
|
||||
# not English, so that the base text will be in that locale by default.
|
||||
if not msgtext:
|
||||
msgtext = Message._translate_msgid(msgid, domain)
|
||||
# We want to initialize the parent unicode with the actual object that
|
||||
# would have been plain unicode if 'Message' was not enabled.
|
||||
msg = super(Message, cls).__new__(cls, msgtext)
|
||||
msg.msgid = msgid
|
||||
msg.domain = domain
|
||||
msg.params = params
|
||||
return msg
|
||||
|
||||
def translate(self, desired_locale=None):
|
||||
"""Translate this message to the desired locale.
|
||||
|
||||
:param desired_locale: The desired locale to translate the message to,
|
||||
if no locale is provided the message will be
|
||||
translated to the system's default locale.
|
||||
|
||||
:returns: the translated message in unicode
|
||||
"""
|
||||
|
||||
translated_message = Message._translate_msgid(self.msgid,
|
||||
self.domain,
|
||||
desired_locale)
|
||||
if self.params is None:
|
||||
# No need for more translation
|
||||
return translated_message
|
||||
|
||||
# This Message object may have been formatted with one or more
|
||||
# Message objects as substitution arguments, given either as a single
|
||||
# argument, part of a tuple, or as one or more values in a dictionary.
|
||||
# When translating this Message we need to translate those Messages too
|
||||
translated_params = _translate_args(self.params, desired_locale)
|
||||
|
||||
translated_message = translated_message % translated_params
|
||||
|
||||
return translated_message
|
||||
|
||||
@staticmethod
|
||||
def _translate_msgid(msgid, domain, desired_locale=None):
|
||||
if not desired_locale:
|
||||
system_locale = locale.getdefaultlocale()
|
||||
# If the system locale is not available to the runtime use English
|
||||
if not system_locale[0]:
|
||||
desired_locale = 'en_US'
|
||||
else:
|
||||
desired_locale = system_locale[0]
|
||||
|
||||
locale_dir = os.environ.get(domain.upper() + '_LOCALEDIR')
|
||||
lang = gettext.translation(domain,
|
||||
localedir=locale_dir,
|
||||
languages=[desired_locale],
|
||||
fallback=True)
|
||||
if six.PY3:
|
||||
translator = lang.gettext
|
||||
else:
|
||||
translator = lang.ugettext
|
||||
|
||||
translated_message = translator(msgid)
|
||||
return translated_message
|
||||
|
||||
def __mod__(self, other):
|
||||
# When we mod a Message we want the actual operation to be performed
|
||||
# by the parent class (i.e. unicode()), the only thing we do here is
|
||||
# save the original msgid and the parameters in case of a translation
|
||||
params = self._sanitize_mod_params(other)
|
||||
unicode_mod = super(Message, self).__mod__(params)
|
||||
modded = Message(self.msgid,
|
||||
msgtext=unicode_mod,
|
||||
params=params,
|
||||
domain=self.domain)
|
||||
return modded
|
||||
|
||||
def _sanitize_mod_params(self, other):
|
||||
"""Sanitize the object being modded with this Message.
|
||||
|
||||
- Add support for modding 'None' so translation supports it
|
||||
- Trim the modded object, which can be a large dictionary, to only
|
||||
those keys that would actually be used in a translation
|
||||
- Snapshot the object being modded, in case the message is
|
||||
translated, it will be used as it was when the Message was created
|
||||
"""
|
||||
if other is None:
|
||||
params = (other,)
|
||||
elif isinstance(other, dict):
|
||||
# Merge the dictionaries
|
||||
# Copy each item in case one does not support deep copy.
|
||||
params = {}
|
||||
if isinstance(self.params, dict):
|
||||
for key, val in self.params.items():
|
||||
params[key] = self._copy_param(val)
|
||||
for key, val in other.items():
|
||||
params[key] = self._copy_param(val)
|
||||
else:
|
||||
params = self._copy_param(other)
|
||||
return params
|
||||
|
||||
def _copy_param(self, param):
|
||||
try:
|
||||
return copy.deepcopy(param)
|
||||
except Exception:
|
||||
# Fallback to casting to unicode this will handle the
|
||||
# python code-like objects that can't be deep-copied
|
||||
return six.text_type(param)
|
||||
|
||||
def __add__(self, other):
|
||||
msg = _('Message objects do not support addition.')
|
||||
raise TypeError(msg)
|
||||
|
||||
def __radd__(self, other):
|
||||
return self.__add__(other)
|
||||
|
||||
if six.PY2:
|
||||
def __str__(self):
|
||||
# NOTE(luisg): Logging in python 2.6 tries to str() log records,
|
||||
# and it expects specifically a UnicodeError in order to proceed.
|
||||
msg = _('Message objects do not support str() because they may '
|
||||
'contain non-ascii characters. '
|
||||
'Please use unicode() or translate() instead.')
|
||||
raise UnicodeError(msg)
|
||||
|
||||
|
||||
def get_available_languages(domain):
|
||||
"""Lists the available languages for the given translation domain.
|
||||
|
||||
:param domain: the domain to get languages for
|
||||
"""
|
||||
if domain in _AVAILABLE_LANGUAGES:
|
||||
return copy.copy(_AVAILABLE_LANGUAGES[domain])
|
||||
|
||||
localedir = '%s_LOCALEDIR' % domain.upper()
|
||||
find = lambda x: gettext.find(domain,
|
||||
localedir=os.environ.get(localedir),
|
||||
languages=[x])
|
||||
|
||||
# NOTE(mrodden): en_US should always be available (and first in case
|
||||
# order matters) since our in-line message strings are en_US
|
||||
language_list = ['en_US']
|
||||
# NOTE(luisg): Babel <1.0 used a function called list(), which was
|
||||
# renamed to locale_identifiers() in >=1.0, the requirements master list
|
||||
# requires >=0.9.6, uncapped, so defensively work with both. We can remove
|
||||
# this check when the master list updates to >=1.0, and update all projects
|
||||
list_identifiers = (getattr(localedata, 'list', None) or
|
||||
getattr(localedata, 'locale_identifiers'))
|
||||
locale_identifiers = list_identifiers()
|
||||
|
||||
for i in locale_identifiers:
|
||||
if find(i) is not None:
|
||||
language_list.append(i)
|
||||
|
||||
# NOTE(luisg): Babel>=1.0,<1.3 has a bug where some OpenStack supported
|
||||
# locales (e.g. 'zh_CN', and 'zh_TW') aren't supported even though they
|
||||
# are perfectly legitimate locales:
|
||||
# https://github.com/mitsuhiko/babel/issues/37
|
||||
# In Babel 1.3 they fixed the bug and they support these locales, but
|
||||
# they are still not explicitly "listed" by locale_identifiers().
|
||||
# That is why we add the locales here explicitly if necessary so that
|
||||
# they are listed as supported.
|
||||
aliases = {'zh': 'zh_CN',
|
||||
'zh_Hant_HK': 'zh_HK',
|
||||
'zh_Hant': 'zh_TW',
|
||||
'fil': 'tl_PH'}
|
||||
for (locale_, alias) in six.iteritems(aliases):
|
||||
if locale_ in language_list and alias not in language_list:
|
||||
language_list.append(alias)
|
||||
|
||||
_AVAILABLE_LANGUAGES[domain] = language_list
|
||||
return copy.copy(language_list)
|
||||
|
||||
|
||||
def translate(obj, desired_locale=None):
|
||||
"""Gets the translated unicode representation of the given object.
|
||||
|
||||
If the object is not translatable it is returned as-is.
|
||||
If the locale is None the object is translated to the system locale.
|
||||
|
||||
:param obj: the object to translate
|
||||
:param desired_locale: the locale to translate the message to, if None the
|
||||
default system locale will be used
|
||||
:returns: the translated object in unicode, or the original object if
|
||||
it could not be translated
|
||||
"""
|
||||
message = obj
|
||||
if not isinstance(message, Message):
|
||||
# If the object to translate is not already translatable,
|
||||
# let's first get its unicode representation
|
||||
message = six.text_type(obj)
|
||||
if isinstance(message, Message):
|
||||
# Even after unicoding() we still need to check if we are
|
||||
# running with translatable unicode before translating
|
||||
return message.translate(desired_locale)
|
||||
return obj
|
||||
|
||||
|
||||
def _translate_args(args, desired_locale=None):
|
||||
"""Translates all the translatable elements of the given arguments object.
|
||||
|
||||
This method is used for translating the translatable values in method
|
||||
arguments which include values of tuples or dictionaries.
|
||||
If the object is not a tuple or a dictionary the object itself is
|
||||
translated if it is translatable.
|
||||
|
||||
If the locale is None the object is translated to the system locale.
|
||||
|
||||
:param args: the args to translate
|
||||
:param desired_locale: the locale to translate the args to, if None the
|
||||
default system locale will be used
|
||||
:returns: a new args object with the translated contents of the original
|
||||
"""
|
||||
if isinstance(args, tuple):
|
||||
return tuple(translate(v, desired_locale) for v in args)
|
||||
if isinstance(args, dict):
|
||||
translated_dict = {}
|
||||
for (k, v) in six.iteritems(args):
|
||||
translated_v = translate(v, desired_locale)
|
||||
translated_dict[k] = translated_v
|
||||
return translated_dict
|
||||
return translate(args, desired_locale)
|
||||
|
||||
|
||||
class TranslationHandler(handlers.MemoryHandler):
|
||||
"""Handler that translates records before logging them.
|
||||
|
||||
The TranslationHandler takes a locale and a target logging.Handler object
|
||||
to forward LogRecord objects to after translating them. This handler
|
||||
depends on Message objects being logged, instead of regular strings.
|
||||
|
||||
The handler can be configured declaratively in the logging.conf as follows:
|
||||
|
||||
[handlers]
|
||||
keys = translatedlog, translator
|
||||
|
||||
[handler_translatedlog]
|
||||
class = handlers.WatchedFileHandler
|
||||
args = ('/var/log/api-localized.log',)
|
||||
formatter = context
|
||||
|
||||
[handler_translator]
|
||||
class = openstack.common.log.TranslationHandler
|
||||
target = translatedlog
|
||||
args = ('zh_CN',)
|
||||
|
||||
If the specified locale is not available in the system, the handler will
|
||||
log in the default locale.
|
||||
"""
|
||||
|
||||
def __init__(self, locale=None, target=None):
|
||||
"""Initialize a TranslationHandler
|
||||
|
||||
:param locale: locale to use for translating messages
|
||||
:param target: logging.Handler object to forward
|
||||
LogRecord objects to after translation
|
||||
"""
|
||||
# NOTE(luisg): In order to allow this handler to be a wrapper for
|
||||
# other handlers, such as a FileHandler, and still be able to
|
||||
# configure it using logging.conf, this handler has to extend
|
||||
# MemoryHandler because only the MemoryHandlers' logging.conf
|
||||
# parsing is implemented such that it accepts a target handler.
|
||||
handlers.MemoryHandler.__init__(self, capacity=0, target=target)
|
||||
self.locale = locale
|
||||
|
||||
def setFormatter(self, fmt):
|
||||
self.target.setFormatter(fmt)
|
||||
|
||||
def emit(self, record):
|
||||
# We save the message from the original record to restore it
|
||||
# after translation, so other handlers are not affected by this
|
||||
original_msg = record.msg
|
||||
original_args = record.args
|
||||
|
||||
try:
|
||||
self._translate_and_log_record(record)
|
||||
finally:
|
||||
record.msg = original_msg
|
||||
record.args = original_args
|
||||
|
||||
def _translate_and_log_record(self, record):
|
||||
record.msg = translate(record.msg, self.locale)
|
||||
|
||||
# In addition to translating the message, we also need to translate
|
||||
# arguments that were passed to the log method that were not part
|
||||
# of the main message e.g., log.info(_('Some message %s'), this_one))
|
||||
record.args = _translate_args(record.args, self.locale)
|
||||
|
||||
self.target.emit(record)
|
@ -1,377 +0,0 @@
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import errno
|
||||
import functools
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import weakref
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from zaqar.openstack.common import fileutils
|
||||
from zaqar.openstack.common.gettextutils import _, _LE, _LI
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
util_opts = [
|
||||
cfg.BoolOpt('disable_process_locking', default=False,
|
||||
help='Enables or disables inter-process locks.'),
|
||||
cfg.StrOpt('lock_path',
|
||||
default=os.environ.get("ZAQAR_LOCK_PATH"),
|
||||
help='Directory to use for lock files.')
|
||||
]
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(util_opts)
|
||||
|
||||
|
||||
def set_defaults(lock_path):
|
||||
cfg.set_defaults(util_opts, lock_path=lock_path)
|
||||
|
||||
|
||||
class _FileLock(object):
|
||||
"""Lock implementation which allows multiple locks, working around
|
||||
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
|
||||
not require any cleanup. Since the lock is always held on a file
|
||||
descriptor rather than outside of the process, the lock gets dropped
|
||||
automatically if the process crashes, even if __exit__ is not executed.
|
||||
|
||||
There are no guarantees regarding usage by multiple green threads in a
|
||||
single process here. This lock works only between processes. Exclusive
|
||||
access between local threads should be achieved using the semaphores
|
||||
in the @synchronized decorator.
|
||||
|
||||
Note these locks are released when the descriptor is closed, so it's not
|
||||
safe to close the file descriptor while another green thread holds the
|
||||
lock. Just opening and closing the lock file can break synchronisation,
|
||||
so lock files must be accessed only using this abstraction.
|
||||
"""
|
||||
|
||||
def __init__(self, name):
|
||||
self.lockfile = None
|
||||
self.fname = name
|
||||
|
||||
def acquire(self):
|
||||
basedir = os.path.dirname(self.fname)
|
||||
|
||||
if not os.path.exists(basedir):
|
||||
fileutils.ensure_tree(basedir)
|
||||
LOG.info(_LI('Created lock path: %s'), basedir)
|
||||
|
||||
self.lockfile = open(self.fname, 'w')
|
||||
|
||||
while True:
|
||||
try:
|
||||
# Using non-blocking locks since green threads are not
|
||||
# patched to deal with blocking locking calls.
|
||||
# Also upon reading the MSDN docs for locking(), it seems
|
||||
# to have a laughable 10 attempts "blocking" mechanism.
|
||||
self.trylock()
|
||||
LOG.debug('Got file lock "%s"', self.fname)
|
||||
return True
|
||||
except IOError as e:
|
||||
if e.errno in (errno.EACCES, errno.EAGAIN):
|
||||
# external locks synchronise things like iptables
|
||||
# updates - give it some time to prevent busy spinning
|
||||
time.sleep(0.01)
|
||||
else:
|
||||
raise threading.ThreadError(_("Unable to acquire lock on"
|
||||
" `%(filename)s` due to"
|
||||
" %(exception)s") %
|
||||
{'filename': self.fname,
|
||||
'exception': e})
|
||||
|
||||
def __enter__(self):
|
||||
self.acquire()
|
||||
return self
|
||||
|
||||
def release(self):
|
||||
try:
|
||||
self.unlock()
|
||||
self.lockfile.close()
|
||||
LOG.debug('Released file lock "%s"', self.fname)
|
||||
except IOError:
|
||||
LOG.exception(_LE("Could not release the acquired lock `%s`"),
|
||||
self.fname)
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.release()
|
||||
|
||||
def exists(self):
|
||||
return os.path.exists(self.fname)
|
||||
|
||||
def trylock(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def unlock(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class _WindowsLock(_FileLock):
|
||||
def trylock(self):
|
||||
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
|
||||
|
||||
def unlock(self):
|
||||
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
|
||||
|
||||
|
||||
class _FcntlLock(_FileLock):
|
||||
def trylock(self):
|
||||
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
|
||||
def unlock(self):
|
||||
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
|
||||
|
||||
|
||||
class _PosixLock(object):
|
||||
def __init__(self, name):
|
||||
# Hash the name because it's not valid to have POSIX semaphore
|
||||
# names with things like / in them. Then use base64 to encode
|
||||
# the digest() instead taking the hexdigest() because the
|
||||
# result is shorter and most systems can't have shm sempahore
|
||||
# names longer than 31 characters.
|
||||
h = hashlib.sha1()
|
||||
h.update(name.encode('ascii'))
|
||||
self.name = str((b'/' + base64.urlsafe_b64encode(
|
||||
h.digest())).decode('ascii'))
|
||||
|
||||
def acquire(self, timeout=None):
|
||||
self.semaphore = posix_ipc.Semaphore(self.name,
|
||||
flags=posix_ipc.O_CREAT,
|
||||
initial_value=1)
|
||||
self.semaphore.acquire(timeout)
|
||||
return self
|
||||
|
||||
def __enter__(self):
|
||||
self.acquire()
|
||||
return self
|
||||
|
||||
def release(self):
|
||||
self.semaphore.release()
|
||||
self.semaphore.close()
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.release()
|
||||
|
||||
def exists(self):
|
||||
try:
|
||||
semaphore = posix_ipc.Semaphore(self.name)
|
||||
except posix_ipc.ExistentialError:
|
||||
return False
|
||||
else:
|
||||
semaphore.close()
|
||||
return True
|
||||
|
||||
|
||||
if os.name == 'nt':
|
||||
import msvcrt
|
||||
InterProcessLock = _WindowsLock
|
||||
FileLock = _WindowsLock
|
||||
else:
|
||||
import base64
|
||||
import fcntl
|
||||
import hashlib
|
||||
|
||||
import posix_ipc
|
||||
InterProcessLock = _PosixLock
|
||||
FileLock = _FcntlLock
|
||||
|
||||
_semaphores = weakref.WeakValueDictionary()
|
||||
_semaphores_lock = threading.Lock()
|
||||
|
||||
|
||||
def _get_lock_path(name, lock_file_prefix, lock_path=None):
|
||||
# NOTE(mikal): the lock name cannot contain directory
|
||||
# separators
|
||||
name = name.replace(os.sep, '_')
|
||||
if lock_file_prefix:
|
||||
sep = '' if lock_file_prefix.endswith('-') else '-'
|
||||
name = '%s%s%s' % (lock_file_prefix, sep, name)
|
||||
|
||||
local_lock_path = lock_path or CONF.lock_path
|
||||
|
||||
if not local_lock_path:
|
||||
# NOTE(bnemec): Create a fake lock path for posix locks so we don't
|
||||
# unnecessarily raise the RequiredOptError below.
|
||||
if InterProcessLock is not _PosixLock:
|
||||
raise cfg.RequiredOptError('lock_path')
|
||||
local_lock_path = 'posixlock:/'
|
||||
|
||||
return os.path.join(local_lock_path, name)
|
||||
|
||||
|
||||
def external_lock(name, lock_file_prefix=None, lock_path=None):
|
||||
LOG.debug('Attempting to grab external lock "%(lock)s"',
|
||||
{'lock': name})
|
||||
|
||||
lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path)
|
||||
|
||||
# NOTE(bnemec): If an explicit lock_path was passed to us then it
|
||||
# means the caller is relying on file-based locking behavior, so
|
||||
# we can't use posix locks for those calls.
|
||||
if lock_path:
|
||||
return FileLock(lock_file_path)
|
||||
return InterProcessLock(lock_file_path)
|
||||
|
||||
|
||||
def remove_external_lock_file(name, lock_file_prefix=None):
|
||||
"""Remove an external lock file when it's not used anymore
|
||||
This will be helpful when we have a lot of lock files
|
||||
"""
|
||||
with internal_lock(name):
|
||||
lock_file_path = _get_lock_path(name, lock_file_prefix)
|
||||
try:
|
||||
os.remove(lock_file_path)
|
||||
except OSError:
|
||||
LOG.info(_LI('Failed to remove file %(file)s'),
|
||||
{'file': lock_file_path})
|
||||
|
||||
|
||||
def internal_lock(name):
|
||||
with _semaphores_lock:
|
||||
try:
|
||||
sem = _semaphores[name]
|
||||
except KeyError:
|
||||
sem = threading.Semaphore()
|
||||
_semaphores[name] = sem
|
||||
|
||||
LOG.debug('Got semaphore "%(lock)s"', {'lock': name})
|
||||
return sem
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
|
||||
"""Context based lock
|
||||
|
||||
This function yields a `threading.Semaphore` instance (if we don't use
|
||||
eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is
|
||||
True, in which case, it'll yield an InterProcessLock instance.
|
||||
|
||||
:param lock_file_prefix: The lock_file_prefix argument is used to provide
|
||||
lock files on disk with a meaningful prefix.
|
||||
|
||||
:param external: The external keyword argument denotes whether this lock
|
||||
should work across multiple processes. This means that if two different
|
||||
workers both run a method decorated with @synchronized('mylock',
|
||||
external=True), only one of them will execute at a time.
|
||||
"""
|
||||
int_lock = internal_lock(name)
|
||||
with int_lock:
|
||||
if external and not CONF.disable_process_locking:
|
||||
ext_lock = external_lock(name, lock_file_prefix, lock_path)
|
||||
with ext_lock:
|
||||
yield ext_lock
|
||||
else:
|
||||
yield int_lock
|
||||
LOG.debug('Released semaphore "%(lock)s"', {'lock': name})
|
||||
|
||||
|
||||
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
|
||||
"""Synchronization decorator.
|
||||
|
||||
Decorating a method like so::
|
||||
|
||||
@synchronized('mylock')
|
||||
def foo(self, *args):
|
||||
...
|
||||
|
||||
ensures that only one thread will execute the foo method at a time.
|
||||
|
||||
Different methods can share the same lock::
|
||||
|
||||
@synchronized('mylock')
|
||||
def foo(self, *args):
|
||||
...
|
||||
|
||||
@synchronized('mylock')
|
||||
def bar(self, *args):
|
||||
...
|
||||
|
||||
This way only one of either foo or bar can be executing at a time.
|
||||
"""
|
||||
|
||||
def wrap(f):
|
||||
@functools.wraps(f)
|
||||
def inner(*args, **kwargs):
|
||||
try:
|
||||
with lock(name, lock_file_prefix, external, lock_path):
|
||||
LOG.debug('Got semaphore / lock "%(function)s"',
|
||||
{'function': f.__name__})
|
||||
return f(*args, **kwargs)
|
||||
finally:
|
||||
LOG.debug('Semaphore / lock released "%(function)s"',
|
||||
{'function': f.__name__})
|
||||
return inner
|
||||
return wrap
|
||||
|
||||
|
||||
def synchronized_with_prefix(lock_file_prefix):
|
||||
"""Partial object generator for the synchronization decorator.
|
||||
|
||||
Redefine @synchronized in each project like so::
|
||||
|
||||
(in nova/utils.py)
|
||||
from nova.openstack.common import lockutils
|
||||
|
||||
synchronized = lockutils.synchronized_with_prefix('nova-')
|
||||
|
||||
|
||||
(in nova/foo.py)
|
||||
from nova import utils
|
||||
|
||||
@utils.synchronized('mylock')
|
||||
def bar(self, *args):
|
||||
...
|
||||
|
||||
The lock_file_prefix argument is used to provide lock files on disk with a
|
||||
meaningful prefix.
|
||||
"""
|
||||
|
||||
return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)
|
||||
|
||||
|
||||
def main(argv):
|
||||
"""Create a dir for locks and pass it to command from arguments
|
||||
|
||||
If you run this:
|
||||
python -m openstack.common.lockutils python setup.py testr <etc>
|
||||
|
||||
a temporary directory will be created for all your locks and passed to all
|
||||
your tests in an environment variable. The temporary dir will be deleted
|
||||
afterwards and the return value will be preserved.
|
||||
"""
|
||||
|
||||
lock_dir = tempfile.mkdtemp()
|
||||
os.environ["ZAQAR_LOCK_PATH"] = lock_dir
|
||||
try:
|
||||
ret_val = subprocess.call(argv[1:])
|
||||
finally:
|
||||
shutil.rmtree(lock_dir, ignore_errors=True)
|
||||
return ret_val
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main(sys.argv))
|
@ -1,210 +0,0 @@
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Time related utilities and helper functions.
|
||||
"""
|
||||
|
||||
import calendar
|
||||
import datetime
|
||||
import time
|
||||
|
||||
import iso8601
|
||||
import six
|
||||
|
||||
|
||||
# ISO 8601 extended time format with microseconds
|
||||
_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f'
|
||||
_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
|
||||
PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
|
||||
|
||||
|
||||
def isotime(at=None, subsecond=False):
|
||||
"""Stringify time in ISO 8601 format."""
|
||||
if not at:
|
||||
at = utcnow()
|
||||
st = at.strftime(_ISO8601_TIME_FORMAT
|
||||
if not subsecond
|
||||
else _ISO8601_TIME_FORMAT_SUBSECOND)
|
||||
tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
|
||||
st += ('Z' if tz == 'UTC' else tz)
|
||||
return st
|
||||
|
||||
|
||||
def parse_isotime(timestr):
|
||||
"""Parse time from ISO 8601 format."""
|
||||
try:
|
||||
return iso8601.parse_date(timestr)
|
||||
except iso8601.ParseError as e:
|
||||
raise ValueError(six.text_type(e))
|
||||
except TypeError as e:
|
||||
raise ValueError(six.text_type(e))
|
||||
|
||||
|
||||
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
|
||||
"""Returns formatted utcnow."""
|
||||
if not at:
|
||||
at = utcnow()
|
||||
return at.strftime(fmt)
|
||||
|
||||
|
||||
def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
|
||||
"""Turn a formatted time back into a datetime."""
|
||||
return datetime.datetime.strptime(timestr, fmt)
|
||||
|
||||
|
||||
def normalize_time(timestamp):
|
||||
"""Normalize time in arbitrary timezone to UTC naive object."""
|
||||
offset = timestamp.utcoffset()
|
||||
if offset is None:
|
||||
return timestamp
|
||||
return timestamp.replace(tzinfo=None) - offset
|
||||
|
||||
|
||||
def is_older_than(before, seconds):
|
||||
"""Return True if before is older than seconds."""
|
||||
if isinstance(before, six.string_types):
|
||||
before = parse_strtime(before).replace(tzinfo=None)
|
||||
else:
|
||||
before = before.replace(tzinfo=None)
|
||||
|
||||
return utcnow() - before > datetime.timedelta(seconds=seconds)
|
||||
|
||||
|
||||
def is_newer_than(after, seconds):
|
||||
"""Return True if after is newer than seconds."""
|
||||
if isinstance(after, six.string_types):
|
||||
after = parse_strtime(after).replace(tzinfo=None)
|
||||
else:
|
||||
after = after.replace(tzinfo=None)
|
||||
|
||||
return after - utcnow() > datetime.timedelta(seconds=seconds)
|
||||
|
||||
|
||||
def utcnow_ts():
|
||||
"""Timestamp version of our utcnow function."""
|
||||
if utcnow.override_time is None:
|
||||
# NOTE(kgriffs): This is several times faster
|
||||
# than going through calendar.timegm(...)
|
||||
return int(time.time())
|
||||
|
||||
return calendar.timegm(utcnow().timetuple())
|
||||
|
||||
|
||||
def utcnow():
|
||||
"""Overridable version of utils.utcnow."""
|
||||
if utcnow.override_time:
|
||||
try:
|
||||
return utcnow.override_time.pop(0)
|
||||
except AttributeError:
|
||||
return utcnow.override_time
|
||||
return datetime.datetime.utcnow()
|
||||
|
||||
|
||||
def iso8601_from_timestamp(timestamp):
|
||||
"""Returns an iso8601 formatted date from timestamp."""
|
||||
return isotime(datetime.datetime.utcfromtimestamp(timestamp))
|
||||
|
||||
|
||||
utcnow.override_time = None
|
||||
|
||||
|
||||
def set_time_override(override_time=None):
|
||||
"""Overrides utils.utcnow.
|
||||
|
||||
Make it return a constant time or a list thereof, one at a time.
|
||||
|
||||
:param override_time: datetime instance or list thereof. If not
|
||||
given, defaults to the current UTC time.
|
||||
"""
|
||||
utcnow.override_time = override_time or datetime.datetime.utcnow()
|
||||
|
||||
|
||||
def advance_time_delta(timedelta):
|
||||
"""Advance overridden time using a datetime.timedelta."""
|
||||
assert utcnow.override_time is not None
|
||||
try:
|
||||
for dt in utcnow.override_time:
|
||||
dt += timedelta
|
||||
except TypeError:
|
||||
utcnow.override_time += timedelta
|
||||
|
||||
|
||||
def advance_time_seconds(seconds):
|
||||
"""Advance overridden time by seconds."""
|
||||
advance_time_delta(datetime.timedelta(0, seconds))
|
||||
|
||||
|
||||
def clear_time_override():
|
||||
"""Remove the overridden time."""
|
||||
utcnow.override_time = None
|
||||
|
||||
|
||||
def marshall_now(now=None):
|
||||
"""Make an rpc-safe datetime with microseconds.
|
||||
|
||||
Note: tzinfo is stripped, but not required for relative times.
|
||||
"""
|
||||
if not now:
|
||||
now = utcnow()
|
||||
return dict(day=now.day, month=now.month, year=now.year, hour=now.hour,
|
||||
minute=now.minute, second=now.second,
|
||||
microsecond=now.microsecond)
|
||||
|
||||
|
||||
def unmarshall_time(tyme):
|
||||
"""Unmarshall a datetime dict."""
|
||||
return datetime.datetime(day=tyme['day'],
|
||||
month=tyme['month'],
|
||||
year=tyme['year'],
|
||||
hour=tyme['hour'],
|
||||
minute=tyme['minute'],
|
||||
second=tyme['second'],
|
||||
microsecond=tyme['microsecond'])
|
||||
|
||||
|
||||
def delta_seconds(before, after):
|
||||
"""Return the difference between two timing objects.
|
||||
|
||||
Compute the difference in seconds between two date, time, or
|
||||
datetime objects (as a float, to microsecond resolution).
|
||||
"""
|
||||
delta = after - before
|
||||
return total_seconds(delta)
|
||||
|
||||
|
||||
def total_seconds(delta):
|
||||
"""Return the total seconds of datetime.timedelta object.
|
||||
|
||||
Compute total seconds of datetime.timedelta, datetime.timedelta
|
||||
doesn't have method total_seconds in Python2.6, calculate it manually.
|
||||
"""
|
||||
try:
|
||||
return delta.total_seconds()
|
||||
except AttributeError:
|
||||
return ((delta.days * 24 * 3600) + delta.seconds +
|
||||
float(delta.microseconds) / (10 ** 6))
|
||||
|
||||
|
||||
def is_soon(dt, window):
|
||||
"""Determines if time is going to happen in the next window seconds.
|
||||
|
||||
:param dt: the time
|
||||
:param window: minimum seconds to remain to consider the time not soon
|
||||
|
||||
:return: True if expiration is within the given duration
|
||||
"""
|
||||
soon = (utcnow() + datetime.timedelta(seconds=window))
|
||||
return normalize_time(dt) <= soon
|
@ -59,7 +59,7 @@ class DriverBase(object):
|
||||
:type conf: `oslo_config.ConfigOpts`
|
||||
:param cache: Cache instance to use for reducing latency
|
||||
for certain lookups.
|
||||
:type cache: `zaqar.openstack.common.cache.backends.BaseCache`
|
||||
:type cache: `dogpile.cache.region.CacheRegion`
|
||||
"""
|
||||
_DRIVER_OPTIONS = []
|
||||
|
||||
@ -91,7 +91,7 @@ class DataDriverBase(DriverBase):
|
||||
:type conf: `oslo_config.ConfigOpts`
|
||||
:param cache: Cache instance to use for reducing latency
|
||||
for certain lookups.
|
||||
:type cache: `zaqar.openstack.common.cache.backends.BaseCache`
|
||||
:type cache: `dogpile.cache.region.CacheRegion`
|
||||
"""
|
||||
|
||||
BASE_CAPABILITIES = []
|
||||
@ -258,7 +258,7 @@ class ControlDriverBase(DriverBase):
|
||||
:type conf: `oslo_config.ConfigOpts`
|
||||
:param cache: Cache instance to use for reducing latency
|
||||
for certain lookups.
|
||||
:type cache: `zaqar.openstack.common.cache.backends.BaseCache`
|
||||
:type cache: `dogpile.cache.region.CacheRegion`
|
||||
"""
|
||||
|
||||
@abc.abstractproperty
|
||||
|
@ -14,10 +14,11 @@
|
||||
# limitations under the License.
|
||||
|
||||
import msgpack
|
||||
from oslo_cache import core
|
||||
from oslo_config import cfg
|
||||
|
||||
from zaqar.common import cache as oslo_cache
|
||||
from zaqar.common import decorators
|
||||
from zaqar.openstack.common.cache import cache as oslo_cache
|
||||
from zaqar.tests import base
|
||||
|
||||
|
||||
@ -38,8 +39,9 @@ class TestDecorators(base.TestBase):
|
||||
|
||||
def test_cached(self):
|
||||
conf = cfg.ConfigOpts()
|
||||
oslo_cache.register_oslo_configs(conf)
|
||||
cache = oslo_cache.get_cache(conf.cache_url)
|
||||
oslo_cache.register_config(conf)
|
||||
conf.cache.backend = 'dogpile.cache.memory'
|
||||
cache = oslo_cache.get_cache(conf)
|
||||
|
||||
sample_project = {
|
||||
u'name': u'Cats Abound',
|
||||
@ -95,8 +97,9 @@ class TestDecorators(base.TestBase):
|
||||
|
||||
def test_cached_with_cond(self):
|
||||
conf = cfg.ConfigOpts()
|
||||
oslo_cache.register_oslo_configs(conf)
|
||||
cache = oslo_cache.get_cache(conf.cache_url)
|
||||
oslo_cache.register_config(conf)
|
||||
conf.cache.backend = 'dogpile.cache.memory'
|
||||
cache = oslo_cache.get_cache(conf)
|
||||
|
||||
class TestClass(object):
|
||||
|
||||
@ -132,7 +135,7 @@ class TestDecorators(base.TestBase):
|
||||
for i in range(3):
|
||||
user = instance.get_user(name)
|
||||
|
||||
self.assertEqual(cache.get(name), None)
|
||||
self.assertEqual(cache.get(name), core.NO_VALUE)
|
||||
|
||||
self.assertEqual(user, name)
|
||||
self.assertEqual(instance.user_gets, 2 + i)
|
||||
|
@ -25,7 +25,7 @@ from oslo_utils import timeutils
|
||||
import six
|
||||
from testtools import matchers
|
||||
|
||||
from zaqar.openstack.common.cache import cache as oslo_cache
|
||||
from zaqar.common import cache as oslo_cache
|
||||
from zaqar import storage
|
||||
from zaqar.storage import errors
|
||||
from zaqar.storage import pipeline
|
||||
@ -51,8 +51,8 @@ class ControllerBaseTest(testing.TestBase):
|
||||
self.controller_class,
|
||||
self.controller_base_class))
|
||||
|
||||
oslo_cache.register_oslo_configs(self.conf)
|
||||
cache = oslo_cache.get_cache(self.conf.cache_url)
|
||||
oslo_cache.register_config(self.conf)
|
||||
cache = oslo_cache.get_cache(self.conf)
|
||||
|
||||
pooling = 'pooling' in self.conf and self.conf.pooling
|
||||
if pooling and not self.control_driver_class:
|
||||
|
@ -25,8 +25,8 @@ import pymongo.errors
|
||||
import six
|
||||
from testtools import matchers
|
||||
|
||||
from zaqar.common import cache as oslo_cache
|
||||
from zaqar.common import configs
|
||||
from zaqar.openstack.common.cache import cache as oslo_cache
|
||||
from zaqar import storage
|
||||
from zaqar.storage import errors
|
||||
from zaqar.storage import mongodb
|
||||
@ -156,10 +156,11 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
|
||||
|
||||
self.conf.register_opts(configs._GENERAL_OPTIONS)
|
||||
self.config(unreliable=False)
|
||||
oslo_cache.register_config(self.conf)
|
||||
|
||||
def test_db_instance(self):
|
||||
self.config(unreliable=True)
|
||||
cache = oslo_cache.get_cache()
|
||||
cache = oslo_cache.get_cache(self.conf)
|
||||
control = mongodb.ControlDriver(self.conf, cache)
|
||||
data = mongodb.DataDriver(self.conf, cache, control)
|
||||
|
||||
@ -169,7 +170,7 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
|
||||
|
||||
def test_version_match(self):
|
||||
self.config(unreliable=True)
|
||||
cache = oslo_cache.get_cache()
|
||||
cache = oslo_cache.get_cache(self.conf)
|
||||
|
||||
with mock.patch('pymongo.MongoClient.server_info') as info:
|
||||
info.return_value = {'version': '2.1'}
|
||||
@ -186,7 +187,7 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
|
||||
self.fail('version match failed')
|
||||
|
||||
def test_replicaset_or_mongos_needed(self):
|
||||
cache = oslo_cache.get_cache()
|
||||
cache = oslo_cache.get_cache(self.conf)
|
||||
|
||||
with mock.patch('pymongo.MongoClient.nodes') as nodes:
|
||||
nodes.__get__ = mock.Mock(return_value=[])
|
||||
@ -197,7 +198,7 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
|
||||
mongodb.ControlDriver(self.conf, cache))
|
||||
|
||||
def test_using_replset(self):
|
||||
cache = oslo_cache.get_cache()
|
||||
cache = oslo_cache.get_cache(self.conf)
|
||||
|
||||
with mock.patch('pymongo.MongoClient.nodes') as nodes:
|
||||
nodes.__get__ = mock.Mock(return_value=['node1', 'node2'])
|
||||
@ -209,7 +210,7 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
|
||||
mongodb.ControlDriver(self.conf, cache))
|
||||
|
||||
def test_using_mongos(self):
|
||||
cache = oslo_cache.get_cache()
|
||||
cache = oslo_cache.get_cache(self.conf)
|
||||
|
||||
with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos:
|
||||
is_mongos.__get__ = mock.Mock(return_value=True)
|
||||
@ -221,7 +222,7 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
|
||||
mongodb.ControlDriver(self.conf, cache))
|
||||
|
||||
def test_write_concern_check_works(self):
|
||||
cache = oslo_cache.get_cache()
|
||||
cache = oslo_cache.get_cache(self.conf)
|
||||
|
||||
with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos:
|
||||
is_mongos.__get__ = mock.Mock(return_value=True)
|
||||
@ -239,7 +240,7 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
|
||||
mongodb.ControlDriver(self.conf, cache))
|
||||
|
||||
def test_write_concern_is_set(self):
|
||||
cache = oslo_cache.get_cache()
|
||||
cache = oslo_cache.get_cache(self.conf)
|
||||
|
||||
with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos:
|
||||
is_mongos.__get__ = mock.Mock(return_value=True)
|
||||
|
@ -21,8 +21,8 @@ from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
import redis
|
||||
|
||||
from zaqar.common import cache as oslo_cache
|
||||
from zaqar.common import errors
|
||||
from zaqar.openstack.common.cache import cache as oslo_cache
|
||||
from zaqar import storage
|
||||
from zaqar.storage import mongodb
|
||||
from zaqar.storage.redis import controllers
|
||||
@ -173,7 +173,8 @@ class RedisDriverTest(testing.TestBase):
|
||||
config_file = 'wsgi_redis.conf'
|
||||
|
||||
def test_db_instance(self):
|
||||
cache = oslo_cache.get_cache()
|
||||
oslo_cache.register_config(self.conf)
|
||||
cache = oslo_cache.get_cache(self.conf)
|
||||
redis_driver = driver.DataDriver(self.conf, cache,
|
||||
driver.ControlDriver
|
||||
(self.conf, cache))
|
||||
@ -181,7 +182,8 @@ class RedisDriverTest(testing.TestBase):
|
||||
self.assertTrue(isinstance(redis_driver.connection, redis.StrictRedis))
|
||||
|
||||
def test_version_match(self):
|
||||
cache = oslo_cache.get_cache()
|
||||
oslo_cache.register_config(self.conf)
|
||||
cache = oslo_cache.get_cache(self.conf)
|
||||
|
||||
with mock.patch('redis.StrictRedis.info') as info:
|
||||
info.return_value = {'redis_version': '2.4.6'}
|
||||
|
@ -15,7 +15,7 @@
|
||||
import mock
|
||||
import uuid
|
||||
|
||||
from zaqar.openstack.common.cache import cache as oslo_cache
|
||||
from zaqar.common import cache as oslo_cache
|
||||
from zaqar.storage import errors
|
||||
from zaqar.storage import mongodb
|
||||
from zaqar.storage import pooling
|
||||
@ -34,7 +34,8 @@ class PoolCatalogTest(testing.TestBase):
|
||||
def setUp(self):
|
||||
super(PoolCatalogTest, self).setUp()
|
||||
|
||||
cache = oslo_cache.get_cache()
|
||||
oslo_cache.register_config(self.conf)
|
||||
cache = oslo_cache.get_cache(self.conf)
|
||||
control = utils.load_storage_driver(self.conf, cache,
|
||||
control_mode=True)
|
||||
|
||||
|
@ -90,7 +90,7 @@ class MessagesBaseTest(base.V2Base):
|
||||
|
||||
# Test GET on the message resource directly
|
||||
# NOTE(cpp-cabrera): force the passing of time to age a message
|
||||
timeutils_utcnow = 'zaqar.openstack.common.timeutils.utcnow'
|
||||
timeutils_utcnow = 'oslo_utils.timeutils.utcnow'
|
||||
now = timeutils.utcnow() + datetime.timedelta(seconds=10)
|
||||
with mock.patch(timeutils_utcnow) as mock_utcnow:
|
||||
mock_utcnow.return_value = now
|
||||
|
@ -137,7 +137,7 @@ class TestClaimsMongoDB(base.V1Base):
|
||||
self.assertEqual(len(listed['messages']), len(claimed))
|
||||
|
||||
now = timeutils.utcnow() + datetime.timedelta(seconds=10)
|
||||
timeutils_utcnow = 'zaqar.openstack.common.timeutils.utcnow'
|
||||
timeutils_utcnow = 'oslo_utils.timeutils.utcnow'
|
||||
with mock.patch(timeutils_utcnow) as mock_utcnow:
|
||||
mock_utcnow.return_value = now
|
||||
body = self.simulate_get(claim_href, self.project_id)
|
||||
|
@ -102,7 +102,7 @@ class TestMessagesMongoDB(base.V1Base):
|
||||
|
||||
# Test GET on the message resource directly
|
||||
# NOTE(cpp-cabrera): force the passing of time to age a message
|
||||
timeutils_utcnow = 'zaqar.openstack.common.timeutils.utcnow'
|
||||
timeutils_utcnow = 'oslo_utils.timeutils.utcnow'
|
||||
now = timeutils.utcnow() + datetime.timedelta(seconds=10)
|
||||
with mock.patch(timeutils_utcnow) as mock_utcnow:
|
||||
mock_utcnow.return_value = now
|
||||
|
@ -184,7 +184,7 @@ class TestClaimsMongoDB(base.V1_1Base):
|
||||
self.assertEqual(len(listed['messages']), len(claimed))
|
||||
|
||||
now = timeutils.utcnow() + datetime.timedelta(seconds=10)
|
||||
timeutils_utcnow = 'zaqar.openstack.common.timeutils.utcnow'
|
||||
timeutils_utcnow = 'oslo_utils.timeutils.utcnow'
|
||||
with mock.patch(timeutils_utcnow) as mock_utcnow:
|
||||
mock_utcnow.return_value = now
|
||||
body = self.simulate_get(claim_href, headers=self.headers)
|
||||
|
@ -105,11 +105,12 @@ class TestFlavorsMongoDB(base.V1_1Base):
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
def tearDown(self):
|
||||
super(TestFlavorsMongoDB, self).tearDown()
|
||||
self.simulate_delete(self.queue_path)
|
||||
self.simulate_delete(self.flavor_path)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||
|
||||
self.simulate_delete(self.queue_path)
|
||||
self.simulate_delete(self.pool_path)
|
||||
super(TestFlavorsMongoDB, self).tearDown()
|
||||
|
||||
def test_put_flavor_works(self):
|
||||
name = str(uuid.uuid1())
|
||||
|
@ -117,7 +117,7 @@ class TestMessagesMongoDB(base.V1_1Base):
|
||||
|
||||
# Test GET on the message resource directly
|
||||
# NOTE(cpp-cabrera): force the passing of time to age a message
|
||||
timeutils_utcnow = 'zaqar.openstack.common.timeutils.utcnow'
|
||||
timeutils_utcnow = 'oslo_utils.timeutils.utcnow'
|
||||
now = timeutils.utcnow() + datetime.timedelta(seconds=10)
|
||||
with mock.patch(timeutils_utcnow) as mock_utcnow:
|
||||
mock_utcnow.return_value = now
|
||||
|
@ -185,7 +185,7 @@ class TestClaimsMongoDB(base.V2Base):
|
||||
self.assertEqual(len(listed['messages']), len(claimed))
|
||||
|
||||
now = timeutils.utcnow() + datetime.timedelta(seconds=10)
|
||||
timeutils_utcnow = 'zaqar.openstack.common.timeutils.utcnow'
|
||||
timeutils_utcnow = 'oslo_utils.timeutils.utcnow'
|
||||
with mock.patch(timeutils_utcnow) as mock_utcnow:
|
||||
mock_utcnow.return_value = now
|
||||
body = self.simulate_get(claim_href, headers=self.headers)
|
||||
|
@ -104,11 +104,11 @@ class TestFlavorsMongoDB(base.V2Base):
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
def tearDown(self):
|
||||
self.simulate_delete(self.pool_path)
|
||||
self.simulate_delete(self.queue_path)
|
||||
self.simulate_delete(self.flavor_path)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||
self.simulate_delete(self.pool_path)
|
||||
|
||||
self.simulate_delete(self.queue_path)
|
||||
super(TestFlavorsMongoDB, self).tearDown()
|
||||
|
||||
def test_put_flavor_works(self):
|
||||
|
@ -120,7 +120,7 @@ class TestMessagesMongoDB(base.V2Base):
|
||||
|
||||
# Test GET on the message resource directly
|
||||
# NOTE(cpp-cabrera): force the passing of time to age a message
|
||||
timeutils_utcnow = 'zaqar.openstack.common.timeutils.utcnow'
|
||||
timeutils_utcnow = 'oslo_utils.timeutils.utcnow'
|
||||
now = timeutils.utcnow() + datetime.timedelta(seconds=10)
|
||||
with mock.patch(timeutils_utcnow) as mock_utcnow:
|
||||
mock_utcnow.return_value = now
|
||||
|
@ -55,21 +55,16 @@ class TestSubscriptionsMongoDB(base.V2Base):
|
||||
self.subscription_path = (self.url_prefix + '/queues/' + self.queue +
|
||||
'/subscriptions')
|
||||
|
||||
self.addCleanup(self._delete_subscription)
|
||||
|
||||
def _delete_subscription(self, sid=None):
|
||||
if sid:
|
||||
self.simulate_delete(self.subscription_path + '/' + sid,
|
||||
def tearDown(self):
|
||||
resp = self.simulate_get(self.subscription_path,
|
||||
headers=self.headers)
|
||||
resp_doc = jsonutils.loads(resp[0])
|
||||
for s in resp_doc['subscriptions']:
|
||||
self.simulate_delete(self.subscription_path + '/' + s['id'],
|
||||
headers=self.headers)
|
||||
else:
|
||||
resp = self.simulate_get(self.subscription_path,
|
||||
headers=self.headers)
|
||||
resp_doc = jsonutils.loads(resp[0])
|
||||
for s in resp_doc['subscriptions']:
|
||||
self.simulate_delete(self.subscription_path + '/' + s['id'],
|
||||
headers=self.headers)
|
||||
|
||||
self.simulate_delete(self.queue_path)
|
||||
super(TestSubscriptionsMongoDB, self).tearDown()
|
||||
|
||||
def _create_subscription(self,
|
||||
subscriber='http://triger.me',
|
||||
|
@ -76,7 +76,7 @@ class DriverBase(object):
|
||||
:param storage: The storage driver
|
||||
:type storage: zaqar.storage.base.DataDriverBase
|
||||
:param cache: caching object
|
||||
:type cache: zaqar.openstack.common.cache.backends.BaseCache
|
||||
:type cache: dogpile.cache.region.CacheRegion
|
||||
:param control: Storage driver to handle the control plane
|
||||
:type control: zaqar.storage.base.ControlDriverBase
|
||||
"""
|
||||
|
Loading…
x
Reference in New Issue
Block a user