Merge "Clean up pooling meta-controllers"

This commit is contained in:
Jenkins 2014-12-18 17:01:40 +00:00 committed by Gerrit Code Review
commit d774c21362
3 changed files with 118 additions and 120 deletions

View File

@ -489,7 +489,7 @@ class PooledMessageTests(base.MessageControllerTest):
controller_class = pooling.MessageController
driver_class = pooling.DataDriver
control_driver_class = mongodb.ControlDriver
controller_base_class = pooling.RoutingController
controller_base_class = storage.Message
@testing.requires_mongodb
@ -498,7 +498,7 @@ class PooledQueueTests(base.QueueControllerTest):
controller_class = pooling.QueueController
driver_class = pooling.DataDriver
control_driver_class = mongodb.ControlDriver
controller_base_class = pooling.RoutingController
controller_base_class = storage.Queue
@testing.requires_mongodb
@ -507,7 +507,7 @@ class PooledClaimsTests(base.ClaimControllerTest):
controller_class = pooling.ClaimController
driver_class = pooling.DataDriver
control_driver_class = mongodb.ControlDriver
controller_base_class = pooling.RoutingController
controller_base_class = storage.Claim
def test_delete_message_expired_claim(self):
# NOTE(flaper87): The pool tests uses sqlalchemy

View File

@ -19,6 +19,7 @@ import six
import sqlalchemy as sa
from sqlalchemy.sql import func as sfunc
from zaqar import storage
from zaqar.storage import pooling
from zaqar.storage import sqlalchemy
from zaqar.storage.sqlalchemy import controllers
@ -153,7 +154,7 @@ class PooledMessageTests(base.MessageControllerTest):
controller_class = pooling.MessageController
driver_class = pooling.DataDriver
control_driver_class = sqlalchemy.ControlDriver
controller_base_class = pooling.RoutingController
controller_base_class = storage.Message
class PooledClaimsTests(base.ClaimControllerTest):
@ -161,7 +162,7 @@ class PooledClaimsTests(base.ClaimControllerTest):
controller_class = pooling.ClaimController
driver_class = pooling.DataDriver
control_driver_class = sqlalchemy.ControlDriver
controller_base_class = pooling.RoutingController
controller_base_class = storage.Claim
def test_delete_message_expired_claim(self):
# NOTE(flaper87): Several reasons to do this:
@ -177,7 +178,7 @@ class PooledQueueTests(base.QueueControllerTest):
controller_class = pooling.QueueController
driver_class = pooling.DataDriver
control_driver_class = sqlalchemy.ControlDriver
controller_base_class = pooling.RoutingController
controller_base_class = storage.Queue
class MsgidTests(testing.TestBase):

View File

@ -122,55 +122,20 @@ class DataDriver(storage.DataDriverBase):
return ClaimController(self._pool_catalog)
class RoutingController(storage.base.ControllerBase):
"""Routes operations to the appropriate pool.
class QueueController(storage.Queue):
"""Routes operations to a queue controller in the appropriate pool.
This controller stands in for a regular storage controller,
routing operations to a driver instance that represents
the pool to which the queue has been assigned.
Do not instantiate this class directly; use one of the
more specific child classes instead.
:param pool_catalog: a catalog of available pools
:type pool_catalog: queues.pooling.base.Catalog
"""
_resource_name = None
def __init__(self, pool_catalog):
super(RoutingController, self).__init__(None)
self._ctrl_property_name = self._resource_name + '_controller'
super(QueueController, self).__init__(None)
self._pool_catalog = pool_catalog
self._get_controller = self._pool_catalog.get_queue_controller
@decorators.memoized_getattr
def __getattr__(self, name):
# NOTE(kgriffs): Use a closure trick to avoid
# some attr lookups each time forward() is called.
lookup = self._pool_catalog.lookup
# NOTE(kgriffs): Assume that every controller method
# that is exposed to the transport declares queue name
# as its first arg. The only exception to this
# is QueueController.list
def forward(queue, *args, **kwargs):
# NOTE(kgriffs): Using .get since 'project' is an
# optional argument.
storage = lookup(queue, kwargs.get('project'))
target_ctrl = getattr(storage, self._ctrl_property_name)
return getattr(target_ctrl, name)(queue, *args, **kwargs)
return forward
class QueueController(RoutingController):
"""Controller to facilitate special processing for queue operations."""
_resource_name = 'queue'
def __init__(self, pool_catalog):
super(QueueController, self).__init__(pool_catalog)
self._lookup = self._pool_catalog.lookup
def list(self, project=None, marker=None,
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
def _list(self, project=None, marker=None,
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
def all_pages():
cursor = self._pool_catalog._pools_ctrl.list(limit=0)
@ -199,13 +164,13 @@ class QueueController(RoutingController):
yield it()
yield marker_name['next']
def get(self, name, project=None):
def _get(self, name, project=None):
try:
return self.get_metadata(name, project)
except errors.QueueDoesNotExist:
return {}
def create(self, name, metadata=None, project=None):
def _create(self, name, metadata=None, project=None):
flavor = metadata and metadata.get('_flavor', None)
self._pool_catalog.register(name, project=project, flavor=flavor)
@ -214,19 +179,16 @@ class QueueController(RoutingController):
# however. If between the time we register a queue and go to
# look it up, the queue is deleted, then this assertion will
# fail.
target = self._lookup(name, project)
if not target:
control = self._get_controller(name, project)
if not control:
raise RuntimeError('Failed to register queue')
return control.create(name, metadata=metadata, project=project)
return target.queue_controller.create(name,
metadata=metadata,
project=project)
def delete(self, name, project=None):
def _delete(self, name, project=None):
# NOTE(cpp-cabrera): If we fail to find a project/queue in the
# catalogue for a delete, just ignore it.
target = self._lookup(name, project)
if target:
control = self._get_controller(name, project)
if control:
# NOTE(cpp-cabrera): delete from the catalogue first. If
# zaqar crashes in the middle of these two operations,
@ -237,86 +199,81 @@ class QueueController(RoutingController):
# functionally equivalent 204 on a create queue. The
# latter case is more difficult to reason about, and may
# yield 500s in some operations.
control = target.queue_controller
self._pool_catalog.deregister(name, project)
ret = control.delete(name, project)
return ret
return None
def exists(self, name, project=None, **kwargs):
target = self._lookup(name, project)
if target:
control = target.queue_controller
def _exists(self, name, project=None):
control = self._get_controller(name, project)
if control:
return control.exists(name, project=project)
return False
def get_metadata(self, name, project=None):
target = self._lookup(name, project)
if target:
control = target.queue_controller
control = self._get_controller(name, project)
if control:
return control.get_metadata(name, project=project)
raise errors.QueueDoesNotExist(name, project)
def set_metadata(self, name, metadata, project=None):
target = self._lookup(name, project)
if target:
control = target.queue_controller
control = self._get_controller(name, project)
if control:
return control.set_metadata(name, metadata=metadata,
project=project)
raise errors.QueueDoesNotExist(name, project)
def stats(self, name, project=None):
target = self._lookup(name, project)
if target:
control = target.queue_controller
def _stats(self, name, project=None):
control = self._get_controller(name, project)
if control:
return control.stats(name, project=project)
raise errors.QueueDoesNotExist(name, project)
class MessageController(RoutingController):
_resource_name = 'message'
class MessageController(storage.Message):
"""Routes operations to a message controller in the appropriate pool.
:param pool_catalog: a catalog of available pools
:type pool_catalog: queues.pooling.base.Catalog
"""
def __init__(self, pool_catalog):
super(MessageController, self).__init__(pool_catalog)
self._lookup = self._pool_catalog.lookup
super(MessageController, self).__init__(None)
self._pool_catalog = pool_catalog
self._get_controller = self._pool_catalog.get_message_controller
def post(self, queue, messages, client_uuid, project=None):
target = self._lookup(queue, project)
if target:
control = target.message_controller
control = self._get_controller(queue, project)
if control:
return control.post(queue, project=project,
messages=messages,
client_uuid=client_uuid)
raise errors.QueueDoesNotExist(queue, project)
def delete(self, queue, message_id, project=None, claim=None):
target = self._lookup(queue, project)
if target:
control = target.message_controller
control = self._get_controller(queue, project)
if control:
return control.delete(queue, project=project,
message_id=message_id, claim=claim)
return None
def bulk_delete(self, queue, message_ids=None, project=None):
target = self._lookup(queue, project)
if target:
control = target.message_controller
def bulk_delete(self, queue, message_ids, project=None):
control = self._get_controller(queue, project)
if control:
return control.bulk_delete(queue, project=project,
message_ids=message_ids)
return None
def pop(self, queue, limit, project=None):
target = self._lookup(queue, project)
if target:
control = target.message_controller
control = self._get_controller(queue, project)
if control:
return control.pop(queue, project=project, limit=limit)
return None
def bulk_get(self, queue, message_ids, project=None):
target = self._lookup(queue, project)
if target:
control = target.message_controller
control = self._get_controller(queue, project)
if control:
return control.bulk_get(queue, project=project,
message_ids=message_ids)
return []
@ -324,9 +281,8 @@ class MessageController(RoutingController):
def list(self, queue, project=None, marker=None,
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
echo=False, client_uuid=None, include_claimed=False):
target = self._lookup(queue, project)
if target:
control = target.message_controller
control = self._get_controller(queue, project)
if control:
return control.list(queue, project=project,
marker=marker, limit=limit,
echo=echo, client_uuid=client_uuid,
@ -334,57 +290,56 @@ class MessageController(RoutingController):
return iter([[]])
def get(self, queue, message_id, project=None):
target = self._lookup(queue, project)
if target:
control = target.message_controller
control = self._get_controller(queue, project)
if control:
return control.get(queue, message_id=message_id,
project=project)
raise errors.QueueDoesNotExist(queue, project)
def first(self, queue, project=None, sort=1):
target = self._lookup(queue, project)
if target:
control = target.message_controller
control = self._get_controller(queue, project)
if control:
return control.first(queue, project=project, sort=sort)
raise errors.QueueDoesNotExist(queue, project)
class ClaimController(RoutingController):
_resource_name = 'claim'
class ClaimController(storage.Claim):
"""Routes operations to a claim controller in the appropriate pool.
:param pool_catalog: a catalog of available pools
:type pool_catalog: queues.pooling.base.Catalog
"""
def __init__(self, pool_catalog):
super(ClaimController, self).__init__(pool_catalog)
self._lookup = self._pool_catalog.lookup
super(ClaimController, self).__init__(None)
self._pool_catalog = pool_catalog
self._get_controller = self._pool_catalog.get_claim_controller
def create(self, queue, metadata, project=None,
limit=storage.DEFAULT_MESSAGES_PER_CLAIM):
target = self._lookup(queue, project)
if target:
control = target.claim_controller
control = self._get_controller(queue, project)
if control:
return control.create(queue, metadata=metadata,
project=project, limit=limit)
return [None, []]
def get(self, queue, claim_id, project=None):
target = self._lookup(queue, project)
if target:
control = target.claim_controller
control = self._get_controller(queue, project)
if control:
return control.get(queue, claim_id=claim_id,
project=project)
raise errors.ClaimDoesNotExist(claim_id, queue, project)
def update(self, queue, claim_id, metadata, project=None):
target = self._lookup(queue, project)
if target:
control = target.claim_controller
control = self._get_controller(queue, project)
if control:
return control.update(queue, claim_id=claim_id,
project=project, metadata=metadata)
raise errors.ClaimDoesNotExist(claim_id, queue, project)
def delete(self, queue, claim_id, project=None):
target = self._lookup(queue, project)
if target:
control = target.claim_controller
control = self._get_controller(queue, project)
if control:
return control.delete(queue, claim_id=claim_id,
project=project)
return None
@ -494,6 +449,48 @@ class Catalog(object):
"""
self._catalogue_ctrl.delete(project, queue)
def get_queue_controller(self, queue, project=None):
"""Lookup the queue controller for the given queue and project.
:param queue: Name of the queue for which to find a pool
:param project: Project to which the queue belongs, or
None to specify the "global" or "generic" project.
:returns: The queue controller associated with the data driver for
the pool containing (queue, project) or None if this doesn't exist.
:rtype: Maybe QueueController
"""
target = self.lookup(queue, project)
return target and target.queue_controller
def get_message_controller(self, queue, project=None):
"""Lookup the message controller for the given queue and project.
:param queue: Name of the queue for which to find a pool
:param project: Project to which the queue belongs, or
None to specify the "global" or "generic" project.
:returns: The message controller associated with the data driver for
the pool containing (queue, project) or None if this doesn't exist.
:rtype: Maybe MessageController
"""
target = self.lookup(queue, project)
return target and target.message_controller
def get_claim_controller(self, queue, project=None):
"""Lookup the claim controller for the given queue and project.
:param queue: Name of the queue for which to find a pool
:param project: Project to which the queue belongs, or
None to specify the "global" or "generic" project.
:returns: The claim controller associated with the data driver for
the pool containing (queue, project) or None if this doesn't exist.
:rtype: Maybe ClaimController
"""
target = self.lookup(queue, project)
return target and target.claim_controller
def lookup(self, queue, project=None):
"""Lookup a pool driver for the given queue and project.