Merge "fix: Checking whether queue exists adds latency"
This commit is contained in:
commit
83423b96eb
@ -15,16 +15,22 @@
|
||||
|
||||
import functools
|
||||
|
||||
import msgpack
|
||||
|
||||
def cached_getattr(meth):
|
||||
"""Caches attributes returned by __getattr__
|
||||
import marconi.openstack.common.log as logging
|
||||
|
||||
It can be used to cache results from
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def memoized_getattr(meth):
|
||||
"""Memoizes attributes returned by __getattr__
|
||||
|
||||
It can be used to remember the results from
|
||||
__getattr__ and reduce the debt of calling
|
||||
it again when the same attribute is accessed.
|
||||
|
||||
This decorator caches attributes by setting
|
||||
them in the object itself.
|
||||
This decorator memoizes attributes by setting
|
||||
them on the object itself.
|
||||
|
||||
The wrapper returned by this decorator won't alter
|
||||
the returned value.
|
||||
@ -40,6 +46,91 @@ def cached_getattr(meth):
|
||||
return wrapper
|
||||
|
||||
|
||||
def caches(keygen, ttl, cond=None):
|
||||
"""Flags a getter method as being cached using oslo.cache.
|
||||
|
||||
It is assumed that the containing class defines an attribute
|
||||
named `_cache` that is an instance of an oslo.cache backend.
|
||||
|
||||
The getter should raise an exception if the value can't be
|
||||
loaded, which will skip the caching step. Otherwise, the
|
||||
getter must return a value that can be encoded with
|
||||
msgpack.
|
||||
|
||||
Note that you can also flag a remover method such that it
|
||||
will purge an associated item from the cache, e.g.:
|
||||
|
||||
def project_cache_key(user, project=None):
|
||||
return user + ':' + str(project)
|
||||
|
||||
class Project(object):
|
||||
def __init__(self, db, cache):
|
||||
self._db = db
|
||||
self._cache = cache
|
||||
|
||||
@decorators.caches(project_cache_key, 60)
|
||||
def get_project(self, user, project=None):
|
||||
return self._db.get_project(user, project)
|
||||
|
||||
@get_project.purges
|
||||
def del_project(self, user, project=None):
|
||||
self._db.delete_project(user, project)
|
||||
|
||||
:param keygen: A static key generator function. This function
|
||||
must accept the same arguments as the getter, sans `self`.
|
||||
:param ttl: TTL for the cache entry, in seconds.
|
||||
:param cond: Conditional for whether or not to cache the
|
||||
value. Must be a function that takes a single value, and
|
||||
returns True or False.
|
||||
"""
|
||||
|
||||
def purges_prop(remover):
|
||||
|
||||
@functools.wraps(remover)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
# First, purge from cache
|
||||
key = keygen(*args, **kwargs)
|
||||
del self._cache[key]
|
||||
|
||||
# Remove/delete from origin
|
||||
remover(self, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
def prop(getter):
|
||||
|
||||
@functools.wraps(getter)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
key = keygen(*args, **kwargs)
|
||||
packed_value = self._cache.get(key)
|
||||
|
||||
if packed_value is None:
|
||||
value = getter(self, *args, **kwargs)
|
||||
|
||||
# Cache new value if desired
|
||||
if cond is None or cond(value):
|
||||
# NOTE(kgriffs): Setting use_bin_type is essential
|
||||
# for being able to distinguish between Unicode
|
||||
# and binary strings when decoding; otherwise,
|
||||
# both types are normalized to the MessagePack
|
||||
# str format family.
|
||||
packed_value = msgpack.packb(value, use_bin_type=True)
|
||||
|
||||
if not self._cache.set(key, packed_value, ttl):
|
||||
LOG.warn('Failed to cache key: ' + key)
|
||||
else:
|
||||
# NOTE(kgriffs): unpackb does not default to UTF-8,
|
||||
# so we have to explicitly ask for it.
|
||||
value = msgpack.unpackb(packed_value, encoding='utf-8')
|
||||
|
||||
return value
|
||||
|
||||
wrapper.purges = purges_prop
|
||||
return wrapper
|
||||
|
||||
return prop
|
||||
|
||||
|
||||
def lazy_property(write=False, delete=True):
|
||||
"""Creates a lazy property.
|
||||
|
||||
|
@ -48,7 +48,7 @@ class Pipeline(object):
|
||||
def append(self, stage):
|
||||
self._pipeline.append(stage)
|
||||
|
||||
@decorators.cached_getattr
|
||||
@decorators.memoized_getattr
|
||||
def __getattr__(self, name):
|
||||
with self.consumer_for(name) as consumer:
|
||||
return consumer
|
||||
|
@ -33,7 +33,7 @@ class DriverBase(object):
|
||||
:type conf: `oslo.config.ConfigOpts`
|
||||
:param cache: Cache instance to use for reducing latency
|
||||
for certain lookups.
|
||||
:type cache: `marconi.common.cache.backends.BaseCache`
|
||||
:type cache: `marconi.openstack.common.cache.backends.BaseCache`
|
||||
"""
|
||||
def __init__(self, conf, cache):
|
||||
self.conf = conf
|
||||
@ -54,7 +54,7 @@ class DataDriverBase(DriverBase):
|
||||
:type conf: `oslo.config.ConfigOpts`
|
||||
:param cache: Cache instance to use for reducing latency
|
||||
for certain lookups.
|
||||
:type cache: `marconi.common.cache.backends.BaseCache`
|
||||
:type cache: `marconi.openstack.common.cache.backends.BaseCache`
|
||||
"""
|
||||
|
||||
def __init__(self, conf, cache):
|
||||
@ -96,7 +96,7 @@ class ControlDriverBase(DriverBase):
|
||||
:type conf: `oslo.config.ConfigOpts`
|
||||
:param cache: Cache instance to use for reducing latency
|
||||
for certain lookups.
|
||||
:type cache: `marconi.common.cache.backends.BaseCache`
|
||||
:type cache: `marconi.openstack.common.cache.backends.BaseCache`
|
||||
"""
|
||||
|
||||
@abc.abstractproperty
|
||||
|
@ -23,6 +23,7 @@ Field Mappings:
|
||||
|
||||
import pymongo.errors
|
||||
|
||||
from marconi.common import decorators
|
||||
from marconi.openstack.common.gettextutils import _
|
||||
import marconi.openstack.common.log as logging
|
||||
from marconi.openstack.common import timeutils
|
||||
@ -30,9 +31,33 @@ from marconi.queues import storage
|
||||
from marconi.queues.storage import errors
|
||||
from marconi.queues.storage.mongodb import utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# NOTE(kgriffs): E.g.: 'marconi-queuecontroller:5083853/my-queue'
|
||||
_QUEUE_CACHE_PREFIX = 'queuecontroller:'
|
||||
|
||||
# NOTE(kgriffs): This causes some race conditions, but they are
|
||||
# harmless. If a queue was deleted, but we are still returning
|
||||
# that it exists, some messages may get inserted without the
|
||||
# client getting an error. In this case, those messages would
|
||||
# be orphaned and expire eventually according to their TTL.
|
||||
#
|
||||
# What this means for the client is that they have a bug; they
|
||||
# deleted a queue and then immediately tried to post messages
|
||||
# to it. If they keep trying to use the queue, they will
|
||||
# eventually start getting an error, once the cache entry
|
||||
# expires, which should clue them in on what happened.
|
||||
#
|
||||
# TODO(kgriffs): Make dynamic?
|
||||
_QUEUE_CACHE_TTL = 5
|
||||
|
||||
|
||||
def _queue_exists_key(queue, project=None):
|
||||
# NOTE(kgriffs): Use string concatenation for performance,
|
||||
# also put project first since it is guaranteed to be
|
||||
# unique, which should reduce lookup time.
|
||||
return _QUEUE_CACHE_PREFIX + 'exists:' + str(project) + '/' + queue
|
||||
|
||||
|
||||
class QueueController(storage.Queue):
|
||||
"""Implements queue resource operations using MongoDB.
|
||||
@ -59,6 +84,7 @@ class QueueController(storage.Queue):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(QueueController, self).__init__(*args, **kwargs)
|
||||
|
||||
self._cache = self.driver.cache
|
||||
self._collection = self.driver.queues_database.queues
|
||||
|
||||
# NOTE(flaper87): This creates a unique index for
|
||||
@ -224,8 +250,11 @@ class QueueController(storage.Queue):
|
||||
else:
|
||||
return True
|
||||
|
||||
# NOTE(kgriffs): Only cache when it exists; if it doesn't exist, and
|
||||
# someone creates it, we want it to be immediately visible.
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
@decorators.caches(_queue_exists_key, _QUEUE_CACHE_TTL, lambda v: v)
|
||||
def exists(self, name, project=None):
|
||||
query = _get_scoped_query(name, project)
|
||||
return self._collection.find_one(query) is not None
|
||||
@ -243,6 +272,7 @@ class QueueController(storage.Queue):
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
@exists.purges
|
||||
def delete(self, name, project=None):
|
||||
self.driver.message_controller._purge_queue(name, project)
|
||||
self._collection.remove(_get_scoped_query(name, project))
|
||||
|
@ -41,7 +41,7 @@ _SHARD_CACHE_PREFIX = 'sharding:'
|
||||
# before "unfreezing" the queue, rather than waiting
|
||||
# on the TTL.
|
||||
#
|
||||
# TODO(kgriffs): Make configurable?
|
||||
# TODO(kgriffs): Make dynamic?
|
||||
_SHARD_CACHE_TTL = 10
|
||||
|
||||
|
||||
@ -106,7 +106,7 @@ class RoutingController(storage.base.ControllerBase):
|
||||
self._ctrl_property_name = self._resource_name + '_controller'
|
||||
self._shard_catalog = shard_catalog
|
||||
|
||||
@decorators.cached_getattr
|
||||
@decorators.memoized_getattr
|
||||
def __getattr__(self, name):
|
||||
# NOTE(kgriffs): Use a closure trick to avoid
|
||||
# some attr lookups each time forward() is called.
|
||||
@ -358,6 +358,7 @@ class Catalog(object):
|
||||
conf = utils.dynamic_conf(shard['uri'], shard['options'])
|
||||
return utils.load_storage_driver(conf, self._cache)
|
||||
|
||||
@decorators.caches(_shard_cache_key, _SHARD_CACHE_TTL)
|
||||
def _shard_id(self, queue, project=None):
|
||||
"""Get the ID for the shard assigned to the given queue.
|
||||
|
||||
@ -368,19 +369,7 @@ class Catalog(object):
|
||||
|
||||
:raises: `errors.QueueNotMapped`
|
||||
"""
|
||||
cache_key = _shard_cache_key(queue, project)
|
||||
shard_id = self._cache.get(cache_key)
|
||||
|
||||
if shard_id is None:
|
||||
shard_id = self._catalogue_ctrl.get(project, queue)['shard']
|
||||
|
||||
if not self._cache.set(cache_key, shard_id, _SHARD_CACHE_TTL):
|
||||
LOG.warn('Failed to cache shard ID')
|
||||
|
||||
return shard_id
|
||||
|
||||
def _invalidate_cached_id(self, queue, project=None):
|
||||
self._cache.unset_many([_shard_cache_key(queue, project)])
|
||||
return self._catalogue_ctrl.get(project, queue)['shard']
|
||||
|
||||
def register(self, queue, project=None):
|
||||
"""Register a new queue in the shard catalog.
|
||||
@ -413,6 +402,7 @@ class Catalog(object):
|
||||
|
||||
self._catalogue_ctrl.insert(project, queue, shard['name'])
|
||||
|
||||
@_shard_id.purges
|
||||
def deregister(self, queue, project=None):
|
||||
"""Removes a queue from the shard catalog.
|
||||
|
||||
@ -425,7 +415,6 @@ class Catalog(object):
|
||||
None for the "global" or "generic" project.
|
||||
:type project: six.text_type
|
||||
"""
|
||||
self._invalidate_cached_id(queue, project)
|
||||
self._catalogue_ctrl.delete(project, queue)
|
||||
|
||||
def lookup(self, queue, project=None):
|
||||
|
@ -41,7 +41,7 @@ class DriverBase(object):
|
||||
:param storage: The storage driver
|
||||
:type storage: marconi.queues.storage.base.DataDriverBase
|
||||
:param cache: caching object
|
||||
:type cache: marconi.common.cache.backends.BaseCache
|
||||
:type cache: marconi.openstack.common.cache.backends.BaseCache
|
||||
:param control: Storage driver to handle the control plane
|
||||
:type control: marconi.queues.storage.base.ControlDriverBase
|
||||
"""
|
||||
|
@ -1,36 +0,0 @@
|
||||
# Copyright (c) 2013 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||
# use this file except in compliance with the License. You may obtain a copy
|
||||
# of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations under
|
||||
# the License.
|
||||
|
||||
import traceback
|
||||
|
||||
from marconi import tests as testing
|
||||
|
||||
|
||||
class TestCache(testing.TestBase):
|
||||
|
||||
def test_import(self):
|
||||
try:
|
||||
from marconi.common.cache._backends import memcached
|
||||
from marconi.common.cache._backends import memory
|
||||
from marconi.common.cache import backends
|
||||
from marconi.common.cache import cache
|
||||
|
||||
except ImportError as ex:
|
||||
self.fail(traceback.format_exc(ex))
|
||||
|
||||
# Avoid pyflakes warnings
|
||||
cache = cache
|
||||
backends = backends
|
||||
memory = memory
|
||||
memcached = memcached
|
@ -13,17 +13,21 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import msgpack
|
||||
from oslo.config import cfg
|
||||
|
||||
from marconi.common import decorators
|
||||
from marconi.openstack.common.cache import cache as oslo_cache
|
||||
from marconi.tests import base
|
||||
|
||||
|
||||
class TestDecorators(base.TestBase):
|
||||
|
||||
def test_cached_getattr(self):
|
||||
def test_memoized_getattr(self):
|
||||
|
||||
class TestClass(object):
|
||||
|
||||
@decorators.cached_getattr
|
||||
@decorators.memoized_getattr
|
||||
def __getattr__(self, name):
|
||||
return name
|
||||
|
||||
@ -31,3 +35,104 @@ class TestDecorators(base.TestBase):
|
||||
result = instance.testing
|
||||
self.assertEqual(result, 'testing')
|
||||
self.assertIn('testing', instance.__dict__)
|
||||
|
||||
def test_cached(self):
|
||||
conf = cfg.ConfigOpts()
|
||||
oslo_cache.register_oslo_configs(conf)
|
||||
cache = oslo_cache.get_cache(conf.cache_url)
|
||||
|
||||
sample_project = {
|
||||
u'name': u'Cats Abound',
|
||||
u'bits': b'\x80\x81\x82\x83\x84',
|
||||
b'key': u'Value. \x80',
|
||||
}
|
||||
|
||||
def create_key(user, project=None):
|
||||
return user + ':' + str(project)
|
||||
|
||||
class TestClass(object):
|
||||
|
||||
def __init__(self, cache):
|
||||
self._cache = cache
|
||||
self.project_gets = 0
|
||||
self.project_dels = 0
|
||||
|
||||
@decorators.caches(create_key, 60)
|
||||
def get_project(self, user, project=None):
|
||||
self.project_gets += 1
|
||||
return sample_project
|
||||
|
||||
@get_project.purges
|
||||
def del_project(self, user, project=None):
|
||||
self.project_dels += 1
|
||||
|
||||
instance = TestClass(cache)
|
||||
|
||||
args = ('23', 'cats')
|
||||
|
||||
project = instance.get_project(*args)
|
||||
self.assertEqual(project, sample_project)
|
||||
self.assertEqual(instance.project_gets, 1)
|
||||
|
||||
# Should be in the cache now.
|
||||
project = msgpack.unpackb(cache.get(create_key(*args)),
|
||||
encoding='utf-8')
|
||||
self.assertEqual(project, sample_project)
|
||||
|
||||
# Should read from the cache this time (counter will not
|
||||
# be incremented).
|
||||
project = instance.get_project(*args)
|
||||
self.assertEqual(project, sample_project)
|
||||
self.assertEqual(instance.project_gets, 1)
|
||||
|
||||
# Use kwargs this time
|
||||
instance.del_project('23', project='cats')
|
||||
self.assertEqual(instance.project_dels, 1)
|
||||
|
||||
# Should be a cache miss since we purged (above)
|
||||
project = instance.get_project(*args)
|
||||
self.assertEqual(instance.project_gets, 2)
|
||||
|
||||
def test_cached_with_cond(self):
|
||||
conf = cfg.ConfigOpts()
|
||||
oslo_cache.register_oslo_configs(conf)
|
||||
cache = oslo_cache.get_cache(conf.cache_url)
|
||||
|
||||
class TestClass(object):
|
||||
|
||||
def __init__(self, cache):
|
||||
self._cache = cache
|
||||
self.user_gets = 0
|
||||
|
||||
@decorators.caches(lambda x: x, 60, lambda v: v != 'kgriffs')
|
||||
def get_user(self, name):
|
||||
self.user_gets += 1
|
||||
return name
|
||||
|
||||
instance = TestClass(cache)
|
||||
|
||||
name = 'malini'
|
||||
|
||||
user = instance.get_user(name)
|
||||
self.assertEqual(user, name)
|
||||
self.assertEqual(instance.user_gets, 1)
|
||||
|
||||
# Should be in the cache now.
|
||||
user = msgpack.unpackb(cache.get(name))
|
||||
self.assertEqual(user, name)
|
||||
|
||||
# Should read from the cache this time (counter will not
|
||||
# be incremented).
|
||||
user = instance.get_user(name)
|
||||
self.assertEqual(user, name)
|
||||
self.assertEqual(instance.user_gets, 1)
|
||||
|
||||
# Won't go into the cache because of cond
|
||||
name = 'kgriffs'
|
||||
for i in range(3):
|
||||
user = instance.get_user(name)
|
||||
|
||||
self.assertEqual(cache.get(name), None)
|
||||
|
||||
self.assertEqual(user, name)
|
||||
self.assertEqual(instance.user_gets, 2 + i)
|
||||
|
Loading…
x
Reference in New Issue
Block a user