Wrap abstract method with base methods
In order to be able to move some validations and general checks, that are currently done in the transport, down to the storage layer, we need to wrap the methods being overloaded with other methods that are defined in the base class. This will allow us to add logic before and after the actual method call if needed. This is the first patch of a series of patches that will update all the storage controllers. Partial-blueprint: expose-storage-capabilities Change-Id: Iff42e3b775ed63661bc69fc1925242ff0e574e46
This commit is contained in:
parent
55624816bd
commit
6ae5372629
@ -27,6 +27,7 @@ import six
|
||||
|
||||
import zaqar.openstack.common.log as logging
|
||||
|
||||
|
||||
DEFAULT_QUEUES_PER_PAGE = 10
|
||||
DEFAULT_MESSAGES_PER_PAGE = 10
|
||||
DEFAULT_POOLS_PER_PAGE = 10
|
||||
@ -279,7 +280,6 @@ class Queue(ControllerBase):
|
||||
numbers of queues.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def list(self, project=None, marker=None,
|
||||
limit=DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
||||
"""Base method for listing queues.
|
||||
@ -292,9 +292,10 @@ class Queue(ControllerBase):
|
||||
:returns: An iterator giving a sequence of queues
|
||||
and the marker of the next page.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
return self._list(project, marker, limit, detailed)
|
||||
|
||||
_list = abc.abstractmethod(lambda x: None)
|
||||
|
||||
@abc.abstractmethod
|
||||
def get(self, name, project=None):
|
||||
"""Base method for queue metadata retrieval.
|
||||
|
||||
@ -304,7 +305,9 @@ class Queue(ControllerBase):
|
||||
:returns: Dictionary containing queue metadata
|
||||
:raises: DoesNotExist
|
||||
"""
|
||||
raise NotImplementedError
|
||||
return self._get(name, project)
|
||||
|
||||
_get = abc.abstractmethod(lambda x: None)
|
||||
|
||||
def get_metadata(self, name, project=None):
|
||||
"""Base method for queue metadata retrieval.
|
||||
@ -317,28 +320,6 @@ class Queue(ControllerBase):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def create(self, name, metadata=None, project=None):
|
||||
"""Base method for queue creation.
|
||||
|
||||
:param name: The queue name
|
||||
:param project: Project id
|
||||
:returns: True if a queue was created and False
|
||||
if it was updated.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def exists(self, name, project=None):
|
||||
"""Base method for testing queue existence.
|
||||
|
||||
:param name: The queue name
|
||||
:param project: Project id
|
||||
:returns: True if a queue exists and False
|
||||
if it does not.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def set_metadata(self, name, metadata, project=None):
|
||||
"""Base method for updating a queue metadata.
|
||||
|
||||
@ -349,16 +330,40 @@ class Queue(ControllerBase):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def create(self, name, metadata=None, project=None):
|
||||
"""Base method for queue creation.
|
||||
|
||||
:param name: The queue name
|
||||
:param project: Project id
|
||||
:returns: True if a queue was created and False
|
||||
if it was updated.
|
||||
"""
|
||||
return self._create(name, metadata, project)
|
||||
|
||||
_create = abc.abstractmethod(lambda x: None)
|
||||
|
||||
def exists(self, name, project=None):
|
||||
"""Base method for testing queue existence.
|
||||
|
||||
:param name: The queue name
|
||||
:param project: Project id
|
||||
:returns: True if a queue exists and False
|
||||
if it does not.
|
||||
"""
|
||||
return self._exists(name, project)
|
||||
|
||||
_exists = abc.abstractmethod(lambda x: None)
|
||||
|
||||
def delete(self, name, project=None):
|
||||
"""Base method for deleting a queue.
|
||||
|
||||
:param name: The queue name
|
||||
:param project: Project id
|
||||
"""
|
||||
raise NotImplementedError
|
||||
return self._delete(name, project)
|
||||
|
||||
_delete = abc.abstractmethod(lambda x: None)
|
||||
|
||||
@abc.abstractmethod
|
||||
def stats(self, name, project=None):
|
||||
"""Base method for queue stats.
|
||||
|
||||
@ -367,7 +372,9 @@ class Queue(ControllerBase):
|
||||
:returns: Dictionary with the
|
||||
queue stats
|
||||
"""
|
||||
raise NotImplementedError
|
||||
return self._stats(name, project)
|
||||
|
||||
_stats = abc.abstractmethod(lambda x: None)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
|
@ -188,14 +188,14 @@ class QueueController(storage.Queue):
|
||||
# Interface
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
def get(self, name, project=None):
|
||||
def _get(self, name, project=None):
|
||||
try:
|
||||
return self.get_metadata(name, project)
|
||||
except errors.QueueDoesNotExist:
|
||||
return {}
|
||||
|
||||
def list(self, project=None, marker=None,
|
||||
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
||||
def _list(self, project=None, marker=None,
|
||||
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
||||
|
||||
query = utils.scoped_query(marker, project)
|
||||
|
||||
@ -229,7 +229,7 @@ class QueueController(storage.Queue):
|
||||
|
||||
@utils.raises_conn_error
|
||||
# @utils.retries_on_autoreconnect
|
||||
def create(self, name, metadata=None, project=None):
|
||||
def _create(self, name, metadata=None, project=None):
|
||||
# NOTE(flaper87): If the connection fails after it was called
|
||||
# and we retry to insert the queue, we could end up returning
|
||||
# `False` because of the `DuplicatedKeyError` although the
|
||||
@ -259,7 +259,7 @@ class QueueController(storage.Queue):
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
@decorators.caches(_queue_exists_key, _QUEUE_CACHE_TTL, lambda v: v)
|
||||
def exists(self, name, project=None):
|
||||
def _exists(self, name, project=None):
|
||||
query = _get_scoped_query(name, project)
|
||||
return self._collection.find_one(query) is not None
|
||||
|
||||
@ -276,14 +276,14 @@ class QueueController(storage.Queue):
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
@exists.purges
|
||||
def delete(self, name, project=None):
|
||||
@_exists.purges
|
||||
def _delete(self, name, project=None):
|
||||
self.driver.message_controller._purge_queue(name, project)
|
||||
self._collection.remove(_get_scoped_query(name, project))
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def stats(self, name, project=None):
|
||||
def _stats(self, name, project=None):
|
||||
if not self.exists(name, project=project):
|
||||
raise errors.QueueDoesNotExist(name, project)
|
||||
|
||||
|
@ -86,8 +86,8 @@ class QueueController(storage.Queue):
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def list(self, project=None, marker=None,
|
||||
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
||||
def _list(self, project=None, marker=None,
|
||||
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
||||
client = self._client
|
||||
qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
|
||||
marker = utils.scope_queue_name(marker, project)
|
||||
@ -109,12 +109,12 @@ class QueueController(storage.Queue):
|
||||
yield utils.QueueListCursor(self._client, cursor, denormalizer)
|
||||
yield marker_next and marker_next['next']
|
||||
|
||||
def get(self, name, project=None):
|
||||
def _get(self, name, project=None):
|
||||
"""Obtain the metadata from the queue."""
|
||||
return self.get_metadata(name, project)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def create(self, name, metadata=None, project=None):
|
||||
def _create(self, name, metadata=None, project=None):
|
||||
# TODO(prashanthr_): Implement as a lua script.
|
||||
queue_key = utils.scope_queue_name(name, project)
|
||||
qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
|
||||
@ -144,7 +144,7 @@ class QueueController(storage.Queue):
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def exists(self, name, project=None):
|
||||
def _exists(self, name, project=None):
|
||||
# TODO(prashanthr_): Cache this lookup
|
||||
queue_key = utils.scope_queue_name(name, project)
|
||||
qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
|
||||
@ -175,7 +175,7 @@ class QueueController(storage.Queue):
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def delete(self, name, project=None):
|
||||
def _delete(self, name, project=None):
|
||||
queue_key = utils.scope_queue_name(name, project)
|
||||
qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
|
||||
|
||||
@ -190,7 +190,7 @@ class QueueController(storage.Queue):
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def stats(self, name, project=None):
|
||||
def _stats(self, name, project=None):
|
||||
if not self.exists(name, project=project):
|
||||
raise errors.QueueDoesNotExist(name, project)
|
||||
|
||||
|
@ -23,8 +23,8 @@ from zaqar.storage.sqlalchemy import utils
|
||||
|
||||
class QueueController(storage.Queue):
|
||||
|
||||
def list(self, project, marker=None,
|
||||
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
||||
def _list(self, project, marker=None,
|
||||
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
||||
|
||||
if project is None:
|
||||
project = ''
|
||||
@ -68,13 +68,13 @@ class QueueController(storage.Queue):
|
||||
except utils.NoResult:
|
||||
raise errors.QueueDoesNotExist(name, project)
|
||||
|
||||
def get(self, name, project=None):
|
||||
def _get(self, name, project=None):
|
||||
try:
|
||||
return self.get_metadata(name, project)
|
||||
except errors.QueueDoesNotExist:
|
||||
return {}
|
||||
|
||||
def create(self, name, metadata=None, project=None):
|
||||
def _create(self, name, metadata=None, project=None):
|
||||
if project is None:
|
||||
project = ''
|
||||
|
||||
@ -89,7 +89,7 @@ class QueueController(storage.Queue):
|
||||
|
||||
return res.rowcount == 1
|
||||
|
||||
def exists(self, name, project):
|
||||
def _exists(self, name, project):
|
||||
if project is None:
|
||||
project = ''
|
||||
|
||||
@ -120,7 +120,7 @@ class QueueController(storage.Queue):
|
||||
finally:
|
||||
res.close()
|
||||
|
||||
def delete(self, name, project):
|
||||
def _delete(self, name, project):
|
||||
if project is None:
|
||||
project = ''
|
||||
|
||||
@ -129,7 +129,7 @@ class QueueController(storage.Queue):
|
||||
tables.Queues.c.name == name))
|
||||
self.driver.run(dlt)
|
||||
|
||||
def stats(self, name, project):
|
||||
def _stats(self, name, project):
|
||||
if project is None:
|
||||
project = ''
|
||||
|
||||
|
@ -69,28 +69,28 @@ class QueueController(storage.Queue):
|
||||
def __init__(self, driver):
|
||||
pass
|
||||
|
||||
def list(self, project=None):
|
||||
def _list(self, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def get(self, name, project=None):
|
||||
def _get(self, name, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_metadata(self, name, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def create(self, name, metadata=None, project=None):
|
||||
def _create(self, name, metadata=None, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def exists(self, name, project=None):
|
||||
def _exists(self, name, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def set_metadata(self, name, metadata, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def delete(self, name, project=None):
|
||||
def _delete(self, name, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def stats(self, name, project=None):
|
||||
def _stats(self, name, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user