Rename shards to pool

Shard turned out to be a quite overloaded/confusing term to use here,
although correct. We decided to rename our shard feature to pool.

P.S: I swear I didn't use sed O.O

Change-Id: Ic54f29a4da7d7690c9c9210b74876b96f0ae0eac
This commit is contained in:
Flavio Percoco 2014-05-29 15:25:24 +02:00
parent e89b390197
commit 15d55e7601
40 changed files with 628 additions and 630 deletions

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""shards: JSON schema for marconi-queues shards resources."""
"""pools: JSON schema for marconi-queues pools resources."""
# NOTE(cpp-cabrera): options can be anything. These will be unique to
# each storage driver, so we don't perform any further validation at

View File

@ -21,7 +21,7 @@ from marconi.common import errors
from marconi.openstack.common.cache import cache as oslo_cache
from marconi.openstack.common import log
from marconi.queues.storage import pipeline
from marconi.queues.storage import sharding
from marconi.queues.storage import pooling
from marconi.queues.storage import utils as storage_utils
from marconi.queues import transport # NOQA
@ -38,13 +38,14 @@ CONF = cfg.CONF
CONF.register_cli_opts(_CLI_OPTIONS)
_GENERAL_OPTIONS = (
cfg.BoolOpt('sharding', default=False,
help=('Enable sharding across multiple storage backends. ',
'If sharding is enabled, the storage driver ',
cfg.BoolOpt('pooling', default=False,
help=('Enable pooling across multiple storage backends. ',
'If pooling is enabled, the storage driver ',
'configuration is used to determine where the ',
'catalogue/control plane data is kept.')),
'catalogue/control plane data is kept.'),
deprecated_opts=[cfg.DeprecatedOpt('pooling')]),
cfg.BoolOpt('admin_mode', default=False,
help='Activate endpoints to manage shard registry.'),
help='Activate endpoints to manage pool registry.'),
)
_DRIVER_OPTIONS = (
@ -81,9 +82,9 @@ class Bootstrap(object):
def storage(self):
LOG.debug(u'Loading storage driver')
if self.conf.sharding:
LOG.debug(u'Storage sharding enabled')
storage_driver = sharding.DataDriver(self.conf, self.cache,
if self.conf.pooling:
LOG.debug(u'Storage pooling enabled')
storage_driver = pooling.DataDriver(self.conf, self.cache,
self.control)
else:
storage_driver = storage_utils.load_storage_driver(

View File

@ -23,10 +23,10 @@ CatalogueBase = base.CatalogueBase
Claim = base.Claim
Message = base.Message
Queue = base.Queue
ShardsBase = base.ShardsBase
PoolsBase = base.PoolsBase
DEFAULT_QUEUES_PER_PAGE = base.DEFAULT_QUEUES_PER_PAGE
DEFAULT_MESSAGES_PER_PAGE = base.DEFAULT_MESSAGES_PER_PAGE
DEFAULT_SHARDS_PER_PAGE = base.DEFAULT_SHARDS_PER_PAGE
DEFAULT_POOLS_PER_PAGE = base.DEFAULT_POOLS_PER_PAGE
DEFAULT_MESSAGES_PER_CLAIM = base.DEFAULT_MESSAGES_PER_CLAIM

View File

@ -20,7 +20,7 @@ import six
DEFAULT_QUEUES_PER_PAGE = 10
DEFAULT_MESSAGES_PER_PAGE = 10
DEFAULT_SHARDS_PER_PAGE = 10
DEFAULT_POOLS_PER_PAGE = 10
DEFAULT_MESSAGES_PER_CLAIM = 10
@ -48,7 +48,7 @@ class DataDriverBase(DriverBase):
core functionality of the system.
Connection information and driver-specific options are
loaded from the config file or the shard catalog.
loaded from the config file or the pool catalog.
:param conf: Configuration containing options for this driver.
:type conf: `oslo.config.ConfigOpts`
@ -89,8 +89,8 @@ class ControlDriverBase(DriverBase):
modify aspects of the functionality of the system. This is ideal
for administrative purposes.
Allows access to the shard registry through a catalogue and a
shard controller.
Allows access to the pool registry through a catalogue and a
pool controller.
:param conf: Configuration containing options for this driver.
:type conf: `oslo.config.ConfigOpts`
@ -105,8 +105,8 @@ class ControlDriverBase(DriverBase):
raise NotImplementedError
@abc.abstractproperty
def shards_controller(self):
"""Returns storage's shard management controller."""
def pools_controller(self):
"""Returns storage's pool management controller."""
raise NotImplementedError
@ -390,71 +390,71 @@ class Claim(ControllerBase):
@six.add_metaclass(abc.ABCMeta)
class ShardsBase(ControllerBase):
"""A controller for managing shards."""
class PoolsBase(ControllerBase):
"""A controller for managing pools."""
@abc.abstractmethod
def list(self, marker=None, limit=DEFAULT_SHARDS_PER_PAGE,
def list(self, marker=None, limit=DEFAULT_POOLS_PER_PAGE,
detailed=False):
"""Lists all registered shards.
"""Lists all registered pools.
:param marker: used to determine which shard to start with
:param marker: used to determine which pool to start with
:type marker: six.text_type
:param limit: (Default 10) Max number of results to return
:type limit: int
:param detailed: whether to include options
:type detailed: bool
:returns: A list of shards - name, weight, uri
:returns: A list of pools - name, weight, uri
:rtype: [{}]
"""
raise NotImplementedError
@abc.abstractmethod
def create(self, name, weight, uri, options=None):
"""Registers a shard entry.
"""Registers a pool entry.
:param name: The name of this shard
:param name: The name of this pool
:type name: six.text_type
:param weight: the likelihood that this shard will be used
:param weight: the likelihood that this pool will be used
:type weight: int
:param uri: A URI that can be used by a storage client
(e.g., pymongo) to access this shard.
(e.g., pymongo) to access this pool.
:type uri: six.text_type
:param options: Options used to configure this shard
:param options: Options used to configure this pool
:type options: dict
"""
raise NotImplementedError
@abc.abstractmethod
def get(self, name, detailed=False):
"""Returns a single shard entry.
"""Returns a single pool entry.
:param name: The name of this shard
:param name: The name of this pool
:type name: six.text_type
:param detailed: Should the options data be included?
:type detailed: bool
:returns: weight, uri, and options for this shard
:returns: weight, uri, and options for this pool
:rtype: {}
:raises: ShardDoesNotExist if not found
:raises: PoolDoesNotExist if not found
"""
raise NotImplementedError
@abc.abstractmethod
def exists(self, name):
"""Returns a single shard entry.
"""Returns a single pool entry.
:param name: The name of this shard
:param name: The name of this pool
:type name: six.text_type
:returns: True if the shard exists
:returns: True if the pool exists
:rtype: bool
"""
raise NotImplementedError
@abc.abstractmethod
def delete(self, name):
"""Removes a shard entry.
"""Removes a pool entry.
:param name: The name of this shard
:param name: The name of this pool
:type name: six.text_type
:rtype: None
"""
@ -462,19 +462,19 @@ class ShardsBase(ControllerBase):
@abc.abstractmethod
def update(self, name, **kwargs):
"""Updates the weight, uris, and/or options of this shard
"""Updates the weight, uris, and/or options of this pool
:param name: Name of the shard
:param name: Name of the pool
:type name: text
:param kwargs: one of: `uri`, `weight`, `options`
:type kwargs: dict
:raises: ShardDoesNotExist
:raises: PoolDoesNotExist
"""
raise NotImplementedError
@abc.abstractmethod
def drop_all(self):
"""Deletes all shards from storage."""
"""Deletes all pools from storage."""
raise NotImplementedError
@ -482,7 +482,7 @@ class ShardsBase(ControllerBase):
class CatalogueBase(ControllerBase):
"""A controller for managing the catalogue. The catalogue is
responsible for maintaining a mapping between project.queue
entries to their shard.
entries to their pool.
"""
@abc.abstractmethod
@ -493,21 +493,21 @@ class CatalogueBase(ControllerBase):
:param project: The project to use when filtering through queue
entries.
:type project: six.text_type
:returns: [{'project': ..., 'queue': ..., 'shard': ...},]
:returns: [{'project': ..., 'queue': ..., 'pool': ...},]
:rtype: [dict]
"""
raise NotImplementedError
@abc.abstractmethod
def get(self, project, queue):
"""Returns the shard identifier for the queue registered under this
"""Returns the pool identifier for the queue registered under this
project.
:param project: Namespace to search for the given queue
:type project: six.text_type
:param queue: The name of the queue to search for
:type queue: six.text_type
:returns: {'shard': ...}
:returns: {'pool': ...}
:rtype: dict
:raises: QueueNotMapped
"""
@ -526,15 +526,15 @@ class CatalogueBase(ControllerBase):
"""
@abc.abstractmethod
def insert(self, project, queue, shard):
def insert(self, project, queue, pool):
"""Creates a new catalogue entry, or updates it if it already existed.
:param project: str - Namespace to insert the given queue into
:type project: six.text_type
:param queue: str - The name of the queue to insert
:type queue: six.text_type
:param shard: shard identifier to associate this queue with
:type shard: six.text_type
:param pool: pool identifier to associate this queue with
:type pool: six.text_type
"""
raise NotImplementedError
@ -550,15 +550,15 @@ class CatalogueBase(ControllerBase):
raise NotImplementedError
@abc.abstractmethod
def update(self, project, queue, shards=None):
"""Updates the shard identifier for this queue
def update(self, project, queue, pools=None):
"""Updates the pool identifier for this queue
:param project: Namespace to search
:type project: six.text_type
:param queue: The name of the queue
:type queue: six.text_type
:param shards: The name of the shard where this project/queue lives.
:type shards: six.text_type
:param pools: The name of the pool where this project/queue lives.
:type pools: six.text_type
:raises: QueueNotMapped
"""
raise NotImplementedError

View File

@ -112,7 +112,7 @@ class ClaimDoesNotExist(DoesNotExist):
class QueueNotMapped(DoesNotExist):
msg_format = (u'No shard found for '
msg_format = (u'No pool found for '
u'queue {queue} for project {project}')
def __init__(self, queue, project):
@ -127,17 +127,17 @@ class MessageIsClaimedBy(NotPermitted):
super(MessageIsClaimedBy, self).__init__(cid=cid, mid=mid)
class ShardDoesNotExist(DoesNotExist):
class PoolDoesNotExist(DoesNotExist):
msg_format = u'Shard {shard} does not exist'
msg_format = u'Pool {pool} does not exist'
def __init__(self, shard):
super(ShardDoesNotExist, self).__init__(shard=shard)
def __init__(self, pool):
super(PoolDoesNotExist, self).__init__(pool=pool)
class NoShardFound(ExceptionBase):
class NoPoolFound(ExceptionBase):
msg_format = u'No shards registered'
msg_format = u'No pools registered'
def __init__(self):
super(NoShardFound, self).__init__()
super(NoPoolFound, self).__init__()

View File

@ -15,11 +15,11 @@
"""MongoDB storage controller for the queues catalogue.
Serves to construct an association between a project + queue -> shard
Serves to construct an association between a project + queue -> pool
{
'p_q': project_queue :: six.text_type,
's': shard_identifier :: six.text_type
's': pool_identifier :: six.text_type
}
"""
@ -47,10 +47,10 @@ class CatalogueController(base.CatalogueBase):
self._col.ensure_index(CATALOGUE_INDEX, unique=True)
@utils.raises_conn_error
def _insert(self, project, queue, shard, upsert):
def _insert(self, project, queue, pool, upsert):
key = utils.scope_queue_name(queue, project)
return self._col.update({PRIMARY_KEY: key},
{'$set': {'s': shard}}, upsert=upsert)
{'$set': {'s': pool}}, upsert=upsert)
@utils.raises_conn_error
def list(self, project):
@ -77,18 +77,18 @@ class CatalogueController(base.CatalogueBase):
key = utils.scope_queue_name(queue, project)
return self._col.find_one({PRIMARY_KEY: key}) is not None
def insert(self, project, queue, shard):
def insert(self, project, queue, pool):
# NOTE(cpp-cabrera): _insert handles conn_error
self._insert(project, queue, shard, upsert=True)
self._insert(project, queue, pool, upsert=True)
@utils.raises_conn_error
def delete(self, project, queue):
self._col.remove({PRIMARY_KEY: utils.scope_queue_name(queue, project)},
w=0)
def update(self, project, queue, shard=None):
def update(self, project, queue, pool=None):
# NOTE(cpp-cabrera): _insert handles conn_error
res = self._insert(project, queue, shard, upsert=False)
res = self._insert(project, queue, pool, upsert=False)
if not res['updatedExisting']:
raise errors.QueueNotMapped(project, queue)
@ -104,5 +104,5 @@ def _normalize(entry):
return {
'queue': queue,
'project': project,
'shard': entry['s']
'pool': entry['s']
}

View File

@ -25,12 +25,12 @@ Field Mappings:
from marconi.queues.storage.mongodb import catalogue
from marconi.queues.storage.mongodb import claims
from marconi.queues.storage.mongodb import messages
from marconi.queues.storage.mongodb import pools
from marconi.queues.storage.mongodb import queues
from marconi.queues.storage.mongodb import shards
CatalogueController = catalogue.CatalogueController
ClaimController = claims.ClaimController
MessageController = messages.MessageController
QueueController = queues.QueueController
ShardsController = shards.ShardsController
PoolsController = pools.PoolsController

View File

@ -45,8 +45,8 @@ class DataDriver(storage.DataDriverBase):
opts = options.MONGODB_OPTIONS
# NOTE(cpp-cabrera): if this data driver is being loaded
# dynamically, as would be the case for a sharded context,
# filter out the options that were given by the shard
# dynamically, as would be the case for a pooled context,
# filter out the options that were given by the pool
# catalogue to avoid DuplicateOptErrors.
if 'dynamic' in conf:
names = conf[options.MONGODB_GROUP].keys()
@ -131,8 +131,8 @@ class ControlDriver(storage.ControlDriverBase):
return self.connection[name]
@property
def shards_controller(self):
return controllers.ShardsController(self)
def pools_controller(self):
return controllers.PoolsController(self)
@property
def catalogue_controller(self):

View File

@ -20,9 +20,6 @@ from oslo.config import cfg
MONGODB_OPTIONS = (
cfg.StrOpt('uri', help='Mongodb Connection URI.'),
# Database name
# TODO(kgriffs): Consider local sharding across DBs to mitigate
# per-DB locking latency.
cfg.StrOpt('database', default='marconi', help='Database name.'),
cfg.IntOpt('partitions', default=2,

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations under
# the License.
"""shards: an implementation of the shard management storage
"""pools: an implementation of the pool management storage
controller for mongodb.
Schema:
@ -29,7 +29,7 @@ from marconi.queues.storage import base
from marconi.queues.storage import errors
from marconi.queues.storage.mongodb import utils
SHARDS_INDEX = [
POOLS_INDEX = [
('n', 1)
]
@ -42,15 +42,15 @@ def _field_spec(detailed=False):
return dict(OMIT_FIELDS + (() if detailed else (('o', False),)))
class ShardsController(base.ShardsBase):
class PoolsController(base.PoolsBase):
def __init__(self, *args, **kwargs):
super(ShardsController, self).__init__(*args, **kwargs)
super(PoolsController, self).__init__(*args, **kwargs)
self._col = self.driver.database.shards
self._col.ensure_index(SHARDS_INDEX,
self._col = self.driver.database.pools
self._col.ensure_index(POOLS_INDEX,
background=True,
name='shards_name',
name='pools_name',
unique=True)
@utils.raises_conn_error
@ -69,7 +69,7 @@ class ShardsController(base.ShardsBase):
res = self._col.find_one({'n': name},
_field_spec(detailed))
if not res:
raise errors.ShardDoesNotExist(name)
raise errors.PoolDoesNotExist(name)
return _normalize(res, detailed)
@ -96,7 +96,7 @@ class ShardsController(base.ShardsBase):
{'$set': fields},
upsert=False)
if not res['updatedExisting']:
raise errors.ShardDoesNotExist(name)
raise errors.PoolDoesNotExist(name)
@utils.raises_conn_error
def delete(self, name):
@ -105,16 +105,16 @@ class ShardsController(base.ShardsBase):
@utils.raises_conn_error
def drop_all(self):
self._col.drop()
self._col.ensure_index(SHARDS_INDEX, unique=True)
self._col.ensure_index(POOLS_INDEX, unique=True)
def _normalize(shard, detailed=False):
def _normalize(pool, detailed=False):
ret = {
'name': shard['n'],
'uri': shard['u'],
'weight': shard['w'],
'name': pool['n'],
'uri': pool['u'],
'weight': pool['w'],
}
if detailed:
ret['options'] = shard['o']
ret['options'] = pool['o']
return ret

View File

@ -31,10 +31,10 @@ _CATALOG_OPTIONS = (
help='Catalog storage driver.'),
)
_CATALOG_GROUP = 'sharding:catalog'
_CATALOG_GROUP = 'pooling:catalog'
# NOTE(kgriffs): E.g.: 'marconi-sharding:5083853/my-queue'
_SHARD_CACHE_PREFIX = 'sharding:'
# NOTE(kgriffs): E.g.: 'marconi-pooling:5083853/my-queue'
_POOL_CACHE_PREFIX = 'pooling:'
# TODO(kgriffs): If a queue is migrated, everyone's
# caches need to have the relevant entry invalidated
@ -42,58 +42,58 @@ _SHARD_CACHE_PREFIX = 'sharding:'
# on the TTL.
#
# TODO(kgriffs): Make dynamic?
_SHARD_CACHE_TTL = 10
_POOL_CACHE_TTL = 10
def _config_options():
return [(_CATALOG_GROUP, _CATALOG_OPTIONS)]
def _shard_cache_key(queue, project=None):
def _pool_cache_key(queue, project=None):
# NOTE(kgriffs): Use string concatenation for performance,
# also put project first since it is guaranteed to be
# unique, which should reduce lookup time.
return _SHARD_CACHE_PREFIX + str(project) + '/' + queue
return _POOL_CACHE_PREFIX + str(project) + '/' + queue
class DataDriver(storage.DataDriverBase):
"""Sharding meta-driver for routing requests to multiple backends.
"""Pooling meta-driver for routing requests to multiple backends.
:param conf: Configuration from which to read sharding options
:param conf: Configuration from which to read pooling options
:param cache: Cache instance that will be passed to individual
storage driver instances that correspond to each shard. will
also be used by the shard controller to reduce latency for
storage driver instances that correspond to each pool. will
also be used by the pool controller to reduce latency for
some operations.
"""
def __init__(self, conf, cache, control):
super(DataDriver, self).__init__(conf, cache)
self._shard_catalog = Catalog(conf, cache, control)
self._pool_catalog = Catalog(conf, cache, control)
def is_alive(self):
return all(self._shard_catalog.get_driver(shard['name']).is_alive()
for shard in
self._shard_catalog._shards_ctrl.list(limit=0))
return all(self._pool_catalog.get_driver(pool['name']).is_alive()
for pool in
self._pool_catalog._pools_ctrl.list(limit=0))
@decorators.lazy_property(write=False)
def queue_controller(self):
return QueueController(self._shard_catalog)
return QueueController(self._pool_catalog)
@decorators.lazy_property(write=False)
def message_controller(self):
return MessageController(self._shard_catalog)
return MessageController(self._pool_catalog)
@decorators.lazy_property(write=False)
def claim_controller(self):
return ClaimController(self._shard_catalog)
return ClaimController(self._pool_catalog)
class RoutingController(storage.base.ControllerBase):
"""Routes operations to the appropriate shard.
"""Routes operations to the appropriate pool.
This controller stands in for a regular storage controller,
routing operations to a driver instance that represents
the shard to which the queue has been assigned.
the pool to which the queue has been assigned.
Do not instantiate this class directly; use one of the
more specific child classes instead.
@ -101,16 +101,16 @@ class RoutingController(storage.base.ControllerBase):
_resource_name = None
def __init__(self, shard_catalog):
def __init__(self, pool_catalog):
super(RoutingController, self).__init__(None)
self._ctrl_property_name = self._resource_name + '_controller'
self._shard_catalog = shard_catalog
self._pool_catalog = pool_catalog
@decorators.memoized_getattr
def __getattr__(self, name):
# NOTE(kgriffs): Use a closure trick to avoid
# some attr lookups each time forward() is called.
lookup = self._shard_catalog.lookup
lookup = self._pool_catalog.lookup
# NOTE(kgriffs): Assume that every controller method
# that is exposed to the transport declares queue name
@ -132,16 +132,16 @@ class QueueController(RoutingController):
_resource_name = 'queue'
def __init__(self, shard_catalog):
super(QueueController, self).__init__(shard_catalog)
self._lookup = self._shard_catalog.lookup
def __init__(self, pool_catalog):
super(QueueController, self).__init__(pool_catalog)
self._lookup = self._pool_catalog.lookup
def list(self, project=None, marker=None,
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
def all_pages():
for shard in self._shard_catalog._shards_ctrl.list(limit=0):
yield next(self._shard_catalog.get_driver(shard['name'])
for pool in self._pool_catalog._pools_ctrl.list(limit=0):
yield next(self._pool_catalog.get_driver(pool['name'])
.queue_controller.list(
project=project,
marker=marker,
@ -166,7 +166,7 @@ class QueueController(RoutingController):
yield marker_name['next']
def create(self, name, project=None):
self._shard_catalog.register(name, project)
self._pool_catalog.register(name, project)
# NOTE(cpp-cabrera): This should always succeed since we just
# registered the project/queue. There is a race condition,
@ -195,7 +195,7 @@ class QueueController(RoutingController):
# latter case is more difficult to reason about, and may
# yield 500s in some operations.
control = target.queue_controller
self._shard_catalog.deregister(name, project)
self._pool_catalog.deregister(name, project)
ret = control.delete(name, project)
return ret
@ -234,9 +234,9 @@ class QueueController(RoutingController):
class MessageController(RoutingController):
_resource_name = 'message'
def __init__(self, shard_catalog):
super(MessageController, self).__init__(shard_catalog)
self._lookup = self._shard_catalog.lookup
def __init__(self, pool_catalog):
super(MessageController, self).__init__(pool_catalog)
self._lookup = self._pool_catalog.lookup
def post(self, queue, project, messages, client_uuid):
target = self._lookup(queue, project)
@ -294,9 +294,9 @@ class MessageController(RoutingController):
class ClaimController(RoutingController):
_resource_name = 'claim'
def __init__(self, shard_catalog):
super(ClaimController, self).__init__(shard_catalog)
self._lookup = self._shard_catalog.lookup
def __init__(self, pool_catalog):
super(ClaimController, self).__init__(pool_catalog)
self._lookup = self._pool_catalog.lookup
def create(self, queue, metadata, project=None, limit=None):
target = self._lookup(queue, project)
@ -332,7 +332,7 @@ class ClaimController(RoutingController):
class Catalog(object):
"""Represents the mapping between queues and shard drivers."""
"""Represents the mapping between queues and pool drivers."""
def __init__(self, conf, cache, control):
self._drivers = {}
@ -342,74 +342,74 @@ class Catalog(object):
self._conf.register_opts(_CATALOG_OPTIONS, group=_CATALOG_GROUP)
self._catalog_conf = self._conf[_CATALOG_GROUP]
self._shards_ctrl = control.shards_controller
self._pools_ctrl = control.pools_controller
self._catalogue_ctrl = control.catalogue_controller
# FIXME(cpp-cabrera): https://bugs.launchpad.net/marconi/+bug/1252791
def _init_driver(self, shard_id):
"""Given a shard name, returns a storage driver.
def _init_driver(self, pool_id):
"""Given a pool name, returns a storage driver.
:param shard_id: The name of a shard.
:type shard_id: six.text_type
:param pool_id: The name of a pool.
:type pool_id: six.text_type
:returns: a storage driver
:rtype: marconi.queues.storage.base.DataDriver
"""
shard = self._shards_ctrl.get(shard_id, detailed=True)
conf = utils.dynamic_conf(shard['uri'], shard['options'])
pool = self._pools_ctrl.get(pool_id, detailed=True)
conf = utils.dynamic_conf(pool['uri'], pool['options'])
return utils.load_storage_driver(conf, self._cache)
@decorators.caches(_shard_cache_key, _SHARD_CACHE_TTL)
def _shard_id(self, queue, project=None):
"""Get the ID for the shard assigned to the given queue.
@decorators.caches(_pool_cache_key, _POOL_CACHE_TTL)
def _pool_id(self, queue, project=None):
"""Get the ID for the pool assigned to the given queue.
:param queue: name of the queue
:param project: project to which the queue belongs
:returns: shard id
:returns: pool id
:raises: `errors.QueueNotMapped`
"""
return self._catalogue_ctrl.get(project, queue)['shard']
return self._catalogue_ctrl.get(project, queue)['pool']
def register(self, queue, project=None):
"""Register a new queue in the shard catalog.
"""Register a new queue in the pool catalog.
This method should be called whenever a new queue is being
created, and will create an entry in the shard catalog for
created, and will create an entry in the pool catalog for
the given queue.
After using this method to register the queue in the
catalog, the caller should call `lookup()` to get a reference
to a storage driver which will allow interacting with the
queue's assigned backend shard.
queue's assigned backend pool.
:param queue: Name of the new queue to assign to a shard
:param queue: Name of the new queue to assign to a pool
:type queue: six.text_type
:param project: Project to which the queue belongs, or
None for the "global" or "generic" project.
:type project: six.text_type
:raises: NoShardFound
:raises: NoPoolFound
"""
# NOTE(cpp-cabrera): only register a queue if the entry
# doesn't exist
if not self._catalogue_ctrl.exists(project, queue):
# NOTE(cpp-cabrera): limit=0 implies unlimited - select from
# all shards
shard = select.weighted(self._shards_ctrl.list(limit=0))
# all pools
pool = select.weighted(self._pools_ctrl.list(limit=0))
if not shard:
raise errors.NoShardFound()
if not pool:
raise errors.NoPoolFound()
self._catalogue_ctrl.insert(project, queue, shard['name'])
self._catalogue_ctrl.insert(project, queue, pool['name'])
@_shard_id.purges
@_pool_id.purges
def deregister(self, queue, project=None):
"""Removes a queue from the shard catalog.
"""Removes a queue from the pool catalog.
Call this method after successfully deleting it from a
backend shard.
backend pool.
:param queue: Name of the new queue to assign to a shard
:param queue: Name of the new queue to assign to a pool
:type queue: six.text_type
:param project: Project to which the queue belongs, or
None for the "global" or "generic" project.
@ -418,20 +418,20 @@ class Catalog(object):
self._catalogue_ctrl.delete(project, queue)
def lookup(self, queue, project=None):
"""Lookup a shard driver for the given queue and project.
"""Lookup a pool driver for the given queue and project.
:param queue: Name of the queue for which to find a shard
:param queue: Name of the queue for which to find a pool
:param project: Project to which the queue belongs, or
None to specify the "global" or "generic" project.
:returns: A storage driver instance for the appropriate shard. If
:returns: A storage driver instance for the appropriate pool. If
the driver does not exist yet, it is created and cached. If the
queue is not mapped, returns None.
:rtype: Maybe DataDriver
"""
try:
shard_id = self._shard_id(queue, project)
pool_id = self._pool_id(queue, project)
except errors.QueueNotMapped as ex:
LOG.debug(ex)
@ -441,21 +441,21 @@ class Catalog(object):
# the place.
return None
return self.get_driver(shard_id)
return self.get_driver(pool_id)
def get_driver(self, shard_id):
"""Get storage driver, preferably cached, from a shard name.
def get_driver(self, pool_id):
"""Get storage driver, preferably cached, from a pool name.
:param shard_id: The name of a shard.
:type shard_id: six.text_type
:param pool_id: The name of a pool.
:type pool_id: six.text_type
:returns: a storage driver
:rtype: marconi.queues.storage.base.DataDriver
"""
try:
return self._drivers[shard_id]
return self._drivers[pool_id]
except KeyError:
# NOTE(cpp-cabrera): cache storage driver connection
self._drivers[shard_id] = self._init_driver(shard_id)
self._drivers[pool_id] = self._init_driver(pool_id)
return self._drivers[shard_id]
return self._drivers[pool_id]

View File

@ -15,9 +15,9 @@
"""Sql storage controller for the queues catalogue.
Serves to construct an association between a project + queue -> shard
Serves to construct an association between a project + queue -> pool
name: string -> Shards.name
name: string -> Pools.name
project: string
queue: string
"""
@ -68,15 +68,15 @@ class CatalogueController(base.CatalogueBase):
except errors.QueueNotMapped:
return False
def insert(self, project, queue, shard):
def insert(self, project, queue, pool):
try:
stmt = sa.sql.insert(tables.Catalogue).values(
project=project, queue=queue, shard=shard
project=project, queue=queue, pool=pool
)
self._conn.execute(stmt)
except sa.exc.IntegrityError:
self.update(project, queue, shard)
self.update(project, queue, pool)
def delete(self, project, queue):
stmt = sa.sql.delete(tables.Catalogue).where(
@ -84,8 +84,8 @@ class CatalogueController(base.CatalogueBase):
)
self._conn.execute(stmt)
def update(self, project, queue, shard=None):
if shard is None:
def update(self, project, queue, pool=None):
if pool is None:
return
if not self.exists(project, queue):
@ -93,7 +93,7 @@ class CatalogueController(base.CatalogueBase):
stmt = sa.sql.update(tables.Catalogue).where(
_match(project, queue)
).values(shard=shard)
).values(pool=pool)
self._conn.execute(stmt)
def drop_all(self):
@ -106,5 +106,5 @@ def _normalize(entry):
return {
'queue': queue,
'project': project,
'shard': name
'pool': name
}

View File

@ -16,12 +16,12 @@
from marconi.queues.storage.sqlalchemy import catalogue
from marconi.queues.storage.sqlalchemy import claims
from marconi.queues.storage.sqlalchemy import messages
from marconi.queues.storage.sqlalchemy import pools
from marconi.queues.storage.sqlalchemy import queues
from marconi.queues.storage.sqlalchemy import shards
QueueController = queues.QueueController
ClaimController = claims.ClaimController
MessageController = messages.MessageController
CatalogueController = catalogue.CatalogueController
ShardsController = shards.ShardsController
PoolsController = pools.PoolsController

View File

@ -140,8 +140,8 @@ class ControlDriver(storage.ControlDriverBase):
self.connection.close()
@property
def shards_controller(self):
return controllers.ShardsController(self)
def pools_controller(self):
return controllers.PoolsController(self)
@property
def catalogue_controller(self):

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations under
# the License.
"""shards: an implementation of the shard management storage
"""pools: an implementation of the pool management storage
controller for sqlalchemy.
Schema:
@ -33,10 +33,10 @@ from marconi.queues.storage.sqlalchemy import tables
from marconi.queues.storage.sqlalchemy import utils
class ShardsController(base.ShardsBase):
class PoolsController(base.PoolsBase):
def __init__(self, *args, **kwargs):
super(ShardsController, self).__init__(*args, **kwargs)
super(PoolsController, self).__init__(*args, **kwargs)
self._conn = self.driver.connection
@ -47,8 +47,8 @@ class ShardsController(base.ShardsBase):
# TODO(cpp-cabrera): optimization - limit the columns returned
# when detailed=False by specifying them in the select()
# clause
stmt = sa.sql.select([tables.Shards]).where(
tables.Shards.c.name > marker
stmt = sa.sql.select([tables.Pools]).where(
tables.Pools.c.name > marker
).limit(limit)
cursor = self._conn.execute(stmt)
@ -57,15 +57,15 @@ class ShardsController(base.ShardsBase):
@utils.raises_conn_error
def get(self, name, detailed=False):
stmt = sa.sql.select([tables.Shards]).where(
tables.Shards.c.name == name
stmt = sa.sql.select([tables.Pools]).where(
tables.Pools.c.name == name
)
shard = self._conn.execute(stmt).fetchone()
if shard is None:
raise errors.ShardDoesNotExist(name)
pool = self._conn.execute(stmt).fetchone()
if pool is None:
raise errors.PoolDoesNotExist(name)
return _normalize(shard, detailed)
return _normalize(pool, detailed)
# TODO(cpp-cabrera): rename to upsert
@utils.raises_conn_error
@ -73,7 +73,7 @@ class ShardsController(base.ShardsBase):
opts = None if options is None else utils.json_encode(options)
try:
stmt = sa.sql.expression.insert(tables.Shards).values(
stmt = sa.sql.expression.insert(tables.Pools).values(
name=name, weight=weight, uri=uri, options=opts
)
self._conn.execute(stmt)
@ -86,8 +86,8 @@ class ShardsController(base.ShardsBase):
@utils.raises_conn_error
def exists(self, name):
stmt = sa.sql.select([tables.Shards.c.name]).where(
tables.Shards.c.name == name
stmt = sa.sql.select([tables.Pools.c.name]).where(
tables.Pools.c.name == name
).limit(1)
return self._conn.execute(stmt).fetchone() is not None
@ -105,34 +105,34 @@ class ShardsController(base.ShardsBase):
if 'options' in fields:
fields['options'] = utils.json_encode(fields['options'])
stmt = sa.sql.update(tables.Shards).where(
tables.Shards.c.name == name).values(**fields)
stmt = sa.sql.update(tables.Pools).where(
tables.Pools.c.name == name).values(**fields)
res = self._conn.execute(stmt)
if res.rowcount == 0:
raise errors.ShardDoesNotExist(name)
raise errors.PoolDoesNotExist(name)
@utils.raises_conn_error
def delete(self, name):
stmt = sa.sql.expression.delete(tables.Shards).where(
tables.Shards.c.name == name
stmt = sa.sql.expression.delete(tables.Pools).where(
tables.Pools.c.name == name
)
self._conn.execute(stmt)
@utils.raises_conn_error
def drop_all(self):
stmt = sa.sql.expression.delete(tables.Shards)
stmt = sa.sql.expression.delete(tables.Pools)
self._conn.execute(stmt)
def _normalize(shard, detailed=False):
def _normalize(pool, detailed=False):
ret = {
'name': shard[0],
'uri': shard[1],
'weight': shard[2],
'name': pool[0],
'uri': pool[1],
'weight': pool[2],
}
if detailed:
opts = shard[3]
opts = pool[3]
ret['options'] = utils.json_decode(opts) if opts else None
return ret

View File

@ -57,7 +57,7 @@ Queues = sa.Table('Queues', metadata,
)
Shards = sa.Table('Shards', metadata,
Pools = sa.Table('Pools', metadata,
sa.Column('name', sa.String(64), primary_key=True),
sa.Column('uri', sa.String(255), nullable=False),
sa.Column('weight', sa.INTEGER, nullable=False),
@ -65,8 +65,8 @@ Shards = sa.Table('Shards', metadata,
Catalogue = sa.Table('Catalogue', metadata,
sa.Column('shard', sa.String(64),
sa.ForeignKey('Shards.name',
sa.Column('pool', sa.String(64),
sa.ForeignKey('Pools.name',
ondelete='CASCADE'),
primary_key=True),
sa.Column('project', sa.String(64), nullable=False),

View File

@ -26,9 +26,9 @@ LOG = log.getLogger(__name__)
def dynamic_conf(uri, options):
"""Given metadata, yields a dynamic configuration.
:param uri: shard location
:param uri: pool location
:type uri: six.text_type
:param options: additional shard metadata
:param options: additional pool metadata
:type options: dict
:returns: Configuration object suitable for constructing storage
drivers

View File

@ -17,8 +17,8 @@ from marconi.queues.transport.wsgi.v1_0 import health
from marconi.queues.transport.wsgi.v1_0 import homedoc
from marconi.queues.transport.wsgi.v1_0 import messages
from marconi.queues.transport.wsgi.v1_0 import metadata
from marconi.queues.transport.wsgi.v1_0 import pools
from marconi.queues.transport.wsgi.v1_0 import queues
from marconi.queues.transport.wsgi.v1_0 import shards
from marconi.queues.transport.wsgi.v1_0 import stats
@ -70,11 +70,11 @@ def public_endpoints(driver):
def private_endpoints(driver):
shards_controller = driver._control.shards_controller
pools_controller = driver._control.pools_controller
return [
('/shards',
shards.Listing(shards_controller)),
('/shards/{shard}',
shards.Resource(shards_controller)),
('/pools',
pools.Listing(pools_controller)),
('/pools/{pool}',
pools.Resource(pools_controller)),
]

View File

@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""shards: a resource to handle storage shard management
"""pools: a resource to handle storage pool management
A shard is added by an operator by interacting with the
sharding-related endpoints. When specifying a shard, the
A pool is added by an operator by interacting with the
pooling-related endpoints. When specifying a pool, the
following fields are required:
{
@ -25,7 +25,7 @@ following fields are required:
"uri": string::uri
}
Furthermore, depending on the underlying storage type of shard being
Furthermore, depending on the underlying storage type of pool being
registered, there is an optional field:
{
@ -36,7 +36,7 @@ registered, there is an optional field:
import falcon
import jsonschema
from marconi.common.schemas import shards as schema
from marconi.common.schemas import pools as schema
from marconi.common.transport.wsgi import utils
from marconi.common import utils as common_utils
from marconi.openstack.common import log
@ -49,15 +49,15 @@ LOG = log.getLogger(__name__)
class Listing(object):
"""A resource to list registered shards
"""A resource to list registered pools
:param shards_controller: means to interact with storage
:param pools_controller: means to interact with storage
"""
def __init__(self, shards_controller):
self._ctrl = shards_controller
def __init__(self, pools_controller):
self._ctrl = pools_controller
def on_get(self, request, response, project_id):
"""Returns a shard listing as objects embedded in an array:
"""Returns a pool listing as objects embedded in an array:
[
{"href": "", "weight": 100, "uri": ""},
@ -66,7 +66,7 @@ class Listing(object):
:returns: HTTP | [200, 204]
"""
LOG.debug(u'LIST shards')
LOG.debug(u'LIST pools')
store = {}
request.get_param('marker', store=store)
@ -74,11 +74,11 @@ class Listing(object):
request.get_param_as_bool('detailed', store=store)
results = {}
results['shards'] = list(self._ctrl.list(**store))
for entry in results['shards']:
results['pools'] = list(self._ctrl.list(**store))
for entry in results['pools']:
entry['href'] = request.path + '/' + entry.pop('name')
if not results['shards']:
if not results['pools']:
response.status = falcon.HTTP_204
return
@ -88,12 +88,12 @@ class Listing(object):
class Resource(object):
"""A handler for individual shard.
"""A handler for individual pool.
:param shards_controller: means to interact with storage
:param pools_controller: means to interact with storage
"""
def __init__(self, shards_controller):
self._ctrl = shards_controller
def __init__(self, pools_controller):
self._ctrl = pools_controller
validator_type = jsonschema.Draft4Validator
self._validators = {
'weight': validator_type(schema.patch_weight),
@ -102,21 +102,21 @@ class Resource(object):
'create': validator_type(schema.create)
}
def on_get(self, request, response, project_id, shard):
"""Returns a JSON object for a single shard entry:
def on_get(self, request, response, project_id, pool):
"""Returns a JSON object for a single pool entry:
{"weight": 100, "uri": "", options: {...}}
:returns: HTTP | [200, 404]
"""
LOG.debug(u'GET shard - name: %s', shard)
LOG.debug(u'GET pool - name: %s', pool)
data = None
detailed = request.get_param_as_bool('detailed') or False
try:
data = self._ctrl.get(shard, detailed)
data = self._ctrl.get(pool, detailed)
except errors.ShardDoesNotExist as ex:
except errors.PoolDoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
@ -127,8 +127,8 @@ class Resource(object):
response.body = transport_utils.to_json(data)
response.content_location = request.relative_uri
def on_put(self, request, response, project_id, shard):
"""Registers a new shard. Expects the following input:
def on_put(self, request, response, project_id, pool):
"""Registers a new pool. Expects the following input:
{"weight": 100, "uri": ""}
@ -136,7 +136,7 @@ class Resource(object):
:returns: HTTP | [201, 204]
"""
LOG.debug(u'PUT shard - name: %s', shard)
LOG.debug(u'PUT pool - name: %s', pool)
data = utils.load(request)
utils.validate(self._validators['create'], data)
@ -144,23 +144,23 @@ class Resource(object):
raise wsgi_errors.HTTPBadRequestBody(
'cannot connect to %s' % data['uri']
)
self._ctrl.create(shard, weight=data['weight'],
self._ctrl.create(pool, weight=data['weight'],
uri=data['uri'],
options=data.get('options', {}))
response.status = falcon.HTTP_201
response.location = request.path
def on_delete(self, request, response, project_id, shard):
"""Deregisters a shard.
def on_delete(self, request, response, project_id, pool):
"""Deregisters a pool.
:returns: HTTP | 204
"""
LOG.debug(u'DELETE shard - name: %s', shard)
self._ctrl.delete(shard)
LOG.debug(u'DELETE pool - name: %s', pool)
self._ctrl.delete(pool)
response.status = falcon.HTTP_204
def on_patch(self, request, response, project_id, shard):
"""Allows one to update a shard's weight, uri, and/or options.
def on_patch(self, request, response, project_id, pool):
"""Allows one to update a pool's weight, uri, and/or options.
This method expects the user to submit a JSON object
containing at least one of: 'uri', 'weight', 'options'. If
@ -171,12 +171,12 @@ class Resource(object):
:returns: HTTP | 200,400
"""
LOG.debug(u'PATCH shard - name: %s', shard)
LOG.debug(u'PATCH pool - name: %s', pool)
data = utils.load(request)
EXPECT = ('weight', 'uri', 'options')
if not any([(field in data) for field in EXPECT]):
LOG.debug(u'PATCH shard, bad params')
LOG.debug(u'PATCH pool, bad params')
raise wsgi_errors.HTTPBadRequestBody(
'One of `uri`, `weight`, or `options` needs '
'to be specified'
@ -193,7 +193,7 @@ class Resource(object):
pred=lambda v: v is not None)
try:
self._ctrl.update(shard, **fields)
except errors.ShardDoesNotExist as ex:
self._ctrl.update(pool, **fields)
except errors.PoolDoesNotExist as ex:
LOG.exception(ex)
raise falcon.HTTPNotFound()

View File

@ -18,8 +18,8 @@ from marconi.queues.transport.wsgi.v1_1 import homedoc
from marconi.queues.transport.wsgi.v1_1 import messages
from marconi.queues.transport.wsgi.v1_1 import metadata
from marconi.queues.transport.wsgi.v1_1 import ping
from marconi.queues.transport.wsgi.v1_1 import pools
from marconi.queues.transport.wsgi.v1_1 import queues
from marconi.queues.transport.wsgi.v1_1 import shards
from marconi.queues.transport.wsgi.v1_1 import stats
@ -75,11 +75,11 @@ def public_endpoints(driver):
def private_endpoints(driver):
shards_controller = driver._control.shards_controller
pools_controller = driver._control.pools_controller
return [
('/shards',
shards.Listing(shards_controller)),
('/shards/{shard}',
shards.Resource(shards_controller)),
('/pools',
pools.Listing(pools_controller)),
('/pools/{pool}',
pools.Resource(pools_controller)),
]

View File

@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""shards: a resource to handle storage shard management
"""pools: a resource to handle storage pool management
A shard is added by an operator by interacting with the
sharding-related endpoints. When specifying a shard, the
A pool is added by an operator by interacting with the
pooling-related endpoints. When specifying a pool, the
following fields are required:
{
@ -25,7 +25,7 @@ following fields are required:
"uri": string::uri
}
Furthermore, depending on the underlying storage type of shard being
Furthermore, depending on the underlying storage type of pool being
registered, there is an optional field:
{
@ -36,7 +36,7 @@ registered, there is an optional field:
import falcon
import jsonschema
from marconi.common.schemas import shards as schema
from marconi.common.schemas import pools as schema
from marconi.common.transport.wsgi import utils
from marconi.common import utils as common_utils
from marconi.openstack.common import log
@ -49,15 +49,15 @@ LOG = log.getLogger(__name__)
class Listing(object):
"""A resource to list registered shards
"""A resource to list registered pools
:param shards_controller: means to interact with storage
:param pools_controller: means to interact with storage
"""
def __init__(self, shards_controller):
self._ctrl = shards_controller
def __init__(self, pools_controller):
self._ctrl = pools_controller
def on_get(self, request, response, project_id):
"""Returns a shard listing as objects embedded in an array:
"""Returns a pool listing as objects embedded in an array:
[
{"href": "", "weight": 100, "uri": ""},
@ -66,7 +66,7 @@ class Listing(object):
:returns: HTTP | [200, 204]
"""
LOG.debug(u'LIST shards')
LOG.debug(u'LIST pools')
store = {}
request.get_param('marker', store=store)
@ -74,11 +74,11 @@ class Listing(object):
request.get_param_as_bool('detailed', store=store)
results = {}
results['shards'] = list(self._ctrl.list(**store))
for entry in results['shards']:
results['pools'] = list(self._ctrl.list(**store))
for entry in results['pools']:
entry['href'] = request.path + '/' + entry.pop('name')
if not results['shards']:
if not results['pools']:
response.status = falcon.HTTP_204
return
@ -88,12 +88,12 @@ class Listing(object):
class Resource(object):
"""A handler for individual shard.
"""A handler for individual pool.
:param shards_controller: means to interact with storage
:param pools_controller: means to interact with storage
"""
def __init__(self, shards_controller):
self._ctrl = shards_controller
def __init__(self, pools_controller):
self._ctrl = pools_controller
validator_type = jsonschema.Draft4Validator
self._validators = {
'weight': validator_type(schema.patch_weight),
@ -102,21 +102,21 @@ class Resource(object):
'create': validator_type(schema.create)
}
def on_get(self, request, response, project_id, shard):
"""Returns a JSON object for a single shard entry:
def on_get(self, request, response, project_id, pool):
"""Returns a JSON object for a single pool entry:
{"weight": 100, "uri": "", options: {...}}
:returns: HTTP | [200, 404]
"""
LOG.debug(u'GET shard - name: %s', shard)
LOG.debug(u'GET pool - name: %s', pool)
data = None
detailed = request.get_param_as_bool('detailed') or False
try:
data = self._ctrl.get(shard, detailed)
data = self._ctrl.get(pool, detailed)
except errors.ShardDoesNotExist as ex:
except errors.PoolDoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
@ -127,8 +127,8 @@ class Resource(object):
response.body = transport_utils.to_json(data)
response.content_location = request.relative_uri
def on_put(self, request, response, project_id, shard):
"""Registers a new shard. Expects the following input:
def on_put(self, request, response, project_id, pool):
"""Registers a new pool. Expects the following input:
{"weight": 100, "uri": ""}
@ -136,7 +136,7 @@ class Resource(object):
:returns: HTTP | [201, 204]
"""
LOG.debug(u'PUT shard - name: %s', shard)
LOG.debug(u'PUT pool - name: %s', pool)
data = utils.load(request)
utils.validate(self._validators['create'], data)
@ -144,23 +144,23 @@ class Resource(object):
raise wsgi_errors.HTTPBadRequestBody(
'cannot connect to %s' % data['uri']
)
self._ctrl.create(shard, weight=data['weight'],
self._ctrl.create(pool, weight=data['weight'],
uri=data['uri'],
options=data.get('options', {}))
response.status = falcon.HTTP_201
response.location = request.path
def on_delete(self, request, response, project_id, shard):
"""Deregisters a shard.
def on_delete(self, request, response, project_id, pool):
"""Deregisters a pool.
:returns: HTTP | 204
"""
LOG.debug(u'DELETE shard - name: %s', shard)
self._ctrl.delete(shard)
LOG.debug(u'DELETE pool - name: %s', pool)
self._ctrl.delete(pool)
response.status = falcon.HTTP_204
def on_patch(self, request, response, project_id, shard):
"""Allows one to update a shard's weight, uri, and/or options.
def on_patch(self, request, response, project_id, pool):
"""Allows one to update a pool's weight, uri, and/or options.
This method expects the user to submit a JSON object
containing at least one of: 'uri', 'weight', 'options'. If
@ -171,12 +171,12 @@ class Resource(object):
:returns: HTTP | 200,400
"""
LOG.debug(u'PATCH shard - name: %s', shard)
LOG.debug(u'PATCH pool - name: %s', pool)
data = utils.load(request)
EXPECT = ('weight', 'uri', 'options')
if not any([(field in data) for field in EXPECT]):
LOG.debug(u'PATCH shard, bad params')
LOG.debug(u'PATCH pool, bad params')
raise wsgi_errors.HTTPBadRequestBody(
'One of `uri`, `weight`, or `options` needs '
'to be specified'
@ -193,7 +193,7 @@ class Resource(object):
pred=lambda v: v is not None)
try:
self._ctrl.update(shard, **fields)
except errors.ShardDoesNotExist as ex:
self._ctrl.update(pool, **fields)
except errors.PoolDoesNotExist as ex:
LOG.exception(ex)
raise falcon.HTTPNotFound()

View File

@ -50,7 +50,7 @@ class ControlDriver(storage.ControlDriverBase):
return None
@property
def shards_controller(self):
def pools_controller(self):
return None

View File

@ -133,7 +133,7 @@ def entries(controller, count):
@contextlib.contextmanager
def shard_entry(controller, project, queue, shard):
def pool_entry(controller, project, queue, pool):
"""Creates a catalogue entry with the given details, and deletes
it once the context manager goes out of scope.
@ -143,18 +143,18 @@ def shard_entry(controller, project, queue, shard):
:type project: six.text_type
:param queue: name of queue
:type queue: six.text_type
:param shard: an identifier for the shard
:type shard: six.text_type
:returns: (project, queue, shard)
:param pool: an identifier for the pool
:type pool: six.text_type
:returns: (project, queue, pool)
:rtype: (six.text_type, six.text_type, six.text_type)
"""
controller.insert(project, queue, shard)
yield (project, queue, shard)
controller.insert(project, queue, pool)
yield (project, queue, pool)
controller.delete(project, queue)
@contextlib.contextmanager
def shard_entries(controller, count):
def pool_entries(controller, count):
"""Creates `count` catalogue entries with the given details, and
deletes them once the context manager goes out of scope.
@ -162,7 +162,7 @@ def shard_entries(controller, count):
:type controller: queues.storage.base:CatalogueBase
:param count: number of entries to create
:type count: int
:returns: [(project, queue, shard)]
:returns: [(project, queue, pool)]
:rtype: [(six.text_type, six.text_type, six.text_type)]
"""
spec = [(u'_', six.text_type(uuid.uuid1()), six.text_type(i))

View File

@ -646,115 +646,115 @@ class ClaimControllerTest(ControllerBaseTest):
project=self.project)
class ShardsControllerTest(ControllerBaseTest):
"""Shards Controller base tests.
class PoolsControllerTest(ControllerBaseTest):
"""Pools Controller base tests.
NOTE(flaper87): Implementations of this class should
override the tearDown method in order
to clean up storage's state.
"""
controller_base_class = storage.ShardsBase
controller_base_class = storage.PoolsBase
def setUp(self):
super(ShardsControllerTest, self).setUp()
self.shards_controller = self.driver.shards_controller
super(PoolsControllerTest, self).setUp()
self.pools_controller = self.driver.pools_controller
# Let's create one shard
self.shard = str(uuid.uuid1())
self.shards_controller.create(self.shard, 100, 'localhost', {})
# Let's create one pool
self.pool = str(uuid.uuid1())
self.pools_controller.create(self.pool, 100, 'localhost', {})
def tearDown(self):
self.shards_controller.drop_all()
super(ShardsControllerTest, self).tearDown()
self.pools_controller.drop_all()
super(PoolsControllerTest, self).tearDown()
def test_create_succeeds(self):
self.shards_controller.create(str(uuid.uuid1()),
self.pools_controller.create(str(uuid.uuid1()),
100, 'localhost', {})
def test_create_replaces_on_duplicate_insert(self):
name = str(uuid.uuid1())
self.shards_controller.create(name,
self.pools_controller.create(name,
100, 'localhost', {})
self.shards_controller.create(name,
self.pools_controller.create(name,
111, 'localhost2', {})
entry = self.shards_controller.get(name)
self._shard_expects(entry, xname=name, xweight=111,
entry = self.pools_controller.get(name)
self._pool_expects(entry, xname=name, xweight=111,
xlocation='localhost2')
def _shard_expects(self, shard, xname, xweight, xlocation):
self.assertIn('name', shard)
self.assertEqual(shard['name'], xname)
self.assertIn('weight', shard)
self.assertEqual(shard['weight'], xweight)
self.assertIn('uri', shard)
self.assertEqual(shard['uri'], xlocation)
def _pool_expects(self, pool, xname, xweight, xlocation):
self.assertIn('name', pool)
self.assertEqual(pool['name'], xname)
self.assertIn('weight', pool)
self.assertEqual(pool['weight'], xweight)
self.assertIn('uri', pool)
self.assertEqual(pool['uri'], xlocation)
def test_get_returns_expected_content(self):
res = self.shards_controller.get(self.shard)
self._shard_expects(res, self.shard, 100, 'localhost')
res = self.pools_controller.get(self.pool)
self._pool_expects(res, self.pool, 100, 'localhost')
self.assertNotIn('options', res)
def test_detailed_get_returns_expected_content(self):
res = self.shards_controller.get(self.shard, detailed=True)
res = self.pools_controller.get(self.pool, detailed=True)
self.assertIn('options', res)
self.assertEqual(res['options'], {})
def test_get_raises_if_not_found(self):
self.assertRaises(storage.errors.ShardDoesNotExist,
self.shards_controller.get, 'notexists')
self.assertRaises(storage.errors.PoolDoesNotExist,
self.pools_controller.get, 'notexists')
def test_exists(self):
self.assertTrue(self.shards_controller.exists(self.shard))
self.assertFalse(self.shards_controller.exists('notexists'))
self.assertTrue(self.pools_controller.exists(self.pool))
self.assertFalse(self.pools_controller.exists('notexists'))
def test_update_raises_assertion_error_on_bad_fields(self):
self.assertRaises(AssertionError, self.shards_controller.update,
self.shard)
self.assertRaises(AssertionError, self.pools_controller.update,
self.pool)
def test_update_works(self):
self.shards_controller.update(self.shard, weight=101,
self.pools_controller.update(self.pool, weight=101,
uri='redis://localhost',
options={'a': 1})
res = self.shards_controller.get(self.shard, detailed=True)
self._shard_expects(res, self.shard, 101, 'redis://localhost')
res = self.pools_controller.get(self.pool, detailed=True)
self._pool_expects(res, self.pool, 101, 'redis://localhost')
self.assertEqual(res['options'], {'a': 1})
def test_delete_works(self):
self.shards_controller.delete(self.shard)
self.assertFalse(self.shards_controller.exists(self.shard))
self.pools_controller.delete(self.pool)
self.assertFalse(self.pools_controller.exists(self.pool))
def test_delete_nonexistent_is_silent(self):
self.shards_controller.delete('nonexisting')
self.pools_controller.delete('nonexisting')
def test_drop_all_leads_to_empty_listing(self):
self.shards_controller.drop_all()
cursor = self.shards_controller.list()
self.pools_controller.drop_all()
cursor = self.pools_controller.list()
self.assertRaises(StopIteration, next, cursor)
def test_listing_simple(self):
# NOTE(cpp-cabrera): base entry interferes with listing results
self.shards_controller.delete(self.shard)
self.pools_controller.delete(self.pool)
name_gen = lambda i: chr(ord('A') + i)
for i in range(15):
self.shards_controller.create(name_gen(i), i, str(i), {})
self.pools_controller.create(name_gen(i), i, str(i), {})
res = list(self.shards_controller.list())
res = list(self.pools_controller.list())
self.assertEqual(len(res), 10)
for i, entry in enumerate(res):
self._shard_expects(entry, name_gen(i), i, str(i))
self._pool_expects(entry, name_gen(i), i, str(i))
self.assertNotIn('options', entry)
res = list(self.shards_controller.list(limit=5))
res = list(self.pools_controller.list(limit=5))
self.assertEqual(len(res), 5)
res = next(self.shards_controller.list(marker=name_gen(3)))
self._shard_expects(res, name_gen(4), 4, '4')
res = next(self.pools_controller.list(marker=name_gen(3)))
self._pool_expects(res, name_gen(4), 4, '4')
res = list(self.shards_controller.list(detailed=True))
res = list(self.pools_controller.list(detailed=True))
self.assertEqual(len(res), 10)
for i, entry in enumerate(res):
self._shard_expects(entry, name_gen(i), i, str(i))
self._pool_expects(entry, name_gen(i), i, str(i))
self.assertIn('options', entry)
self.assertEqual(entry['options'], {})
@ -775,15 +775,15 @@ class CatalogueControllerTest(ControllerBaseTest):
def _check_structure(self, entry):
self.assertIn('queue', entry)
self.assertIn('project', entry)
self.assertIn('shard', entry)
self.assertIn('pool', entry)
self.assertIsInstance(entry['queue'], six.text_type)
self.assertIsInstance(entry['project'], six.text_type)
self.assertIsInstance(entry['shard'], six.text_type)
self.assertIsInstance(entry['pool'], six.text_type)
def _check_value(self, entry, xqueue, xproject, xshard):
def _check_value(self, entry, xqueue, xproject, xpool):
self.assertEqual(entry['queue'], xqueue)
self.assertEqual(entry['project'], xproject)
self.assertEqual(entry['shard'], xshard)
self.assertEqual(entry['pool'], xpool)
def test_catalogue_entry_life_cycle(self):
queue = self.queue
@ -794,13 +794,13 @@ class CatalogueControllerTest(ControllerBaseTest):
self.fail('There should be no entries at this time')
# create a listing, check its length
with helpers.shard_entries(self.controller, 10) as expect:
with helpers.pool_entries(self.controller, 10) as expect:
project = expect[0][0]
xs = list(self.controller.list(project))
self.assertEqual(len(xs), 10)
# create, check existence, delete
with helpers.shard_entry(self.controller, project, queue, u'a'):
with helpers.pool_entry(self.controller, project, queue, u'a'):
self.assertTrue(self.controller.exists(project, queue))
# verify it no longer exists
@ -810,20 +810,20 @@ class CatalogueControllerTest(ControllerBaseTest):
self.assertEqual(len(list(self.controller.list(project))), 0)
def test_list(self):
with helpers.shard_entries(self.controller, 10) as expect:
with helpers.pool_entries(self.controller, 10) as expect:
values = zip(self.controller.list(u'_'), expect)
for e, x in values:
p, q, s = x
self._check_structure(e)
self._check_value(e, xqueue=q, xproject=p, xshard=s)
self._check_value(e, xqueue=q, xproject=p, xpool=s)
def test_update(self):
with helpers.shard_entry(self.controller, self.project,
with helpers.pool_entry(self.controller, self.project,
self.queue, u'a') as expect:
p, q, s = expect
self.controller.update(p, q, shard=u'b')
self.controller.update(p, q, pool=u'b')
entry = self.controller.get(p, q)
self._check_value(entry, xqueue=q, xproject=p, xshard=u'b')
self._check_value(entry, xqueue=q, xproject=p, xpool=u'b')
def test_update_raises_when_entry_does_not_exist(self):
self.assertRaises(errors.QueueNotMapped,
@ -831,15 +831,15 @@ class CatalogueControllerTest(ControllerBaseTest):
'not', 'not', 'a')
def test_get(self):
with helpers.shard_entry(self.controller,
with helpers.pool_entry(self.controller,
self.project,
self.queue, u'a') as expect:
p, q, s = expect
e = self.controller.get(p, q)
self._check_value(e, xqueue=q, xproject=p, xshard=s)
self._check_value(e, xqueue=q, xproject=p, xpool=s)
def test_get_raises_if_does_not_exist(self):
with helpers.shard_entry(self.controller,
with helpers.pool_entry(self.controller,
self.project,
self.queue, u'a') as expect:
p, q, _ = expect
@ -854,7 +854,7 @@ class CatalogueControllerTest(ControllerBaseTest):
'non_existing', 'non_existing')
def test_exists(self):
with helpers.shard_entry(self.controller,
with helpers.pool_entry(self.controller,
self.project,
self.queue, u'a') as expect:
p, q, _ = expect

View File

@ -47,8 +47,8 @@ class TestBase(testing.TestBase):
self.srmock = ftest.StartResponseMock()
def tearDown(self):
if self.conf.sharding:
self.boot.control.shards_controller.drop_all()
if self.conf.pooling:
self.boot.control.pools_controller.drop_all()
self.boot.control.catalogue_controller.drop_all()
super(TestBase, self).tearDown()

View File

@ -18,8 +18,8 @@ from marconi.tests.queues.transport.wsgi.v1 import test_default_limits
from marconi.tests.queues.transport.wsgi.v1 import test_home
from marconi.tests.queues.transport.wsgi.v1 import test_media_type
from marconi.tests.queues.transport.wsgi.v1 import test_messages
from marconi.tests.queues.transport.wsgi.v1 import test_queue_lifecycle as lc
from marconi.tests.queues.transport.wsgi.v1 import test_shards
from marconi.tests.queues.transport.wsgi.v1 import test_pools
from marconi.tests.queues.transport.wsgi.v1 import test_queue_lifecycle as l
TestAuth = test_auth.TestAuth
TestClaimsFaultyDriver = test_claims.TestClaimsFaultyDriver
@ -30,10 +30,10 @@ TestHomeDocument = test_home.TestHomeDocument
TestMediaType = test_media_type.TestMediaType
TestMessagesFaultyDriver = test_messages.TestMessagesFaultyDriver
TestMessagesMongoDB = test_messages.TestMessagesMongoDB
TestMessagesMongoDBSharded = test_messages.TestMessagesMongoDBSharded
TestMessagesMongoDBPooled = test_messages.TestMessagesMongoDBPooled
TestMessagesSqlalchemy = test_messages.TestMessagesSqlalchemy
TestQueueFaultyDriver = lc.TestQueueLifecycleFaultyDriver
TestQueueLifecycleMongoDB = lc.TestQueueLifecycleMongoDB
TestQueueLifecycleSqlalchemy = lc.TestQueueLifecycleSqlalchemy
TestShardsMongoDB = test_shards.TestShardsMongoDB
TestShardsSqlalchemy = test_shards.TestShardsSqlalchemy
TestQueueFaultyDriver = l.TestQueueLifecycleFaultyDriver
TestQueueLifecycleMongoDB = l.TestQueueLifecycleMongoDB
TestQueueLifecycleSqlalchemy = l.TestQueueLifecycleSqlalchemy
TestPoolsMongoDB = test_pools.TestPoolsMongoDB
TestPoolsSqlalchemy = test_pools.TestPoolsSqlalchemy

View File

@ -36,11 +36,11 @@ class MessagesBaseTest(base.V1Base):
def setUp(self):
super(MessagesBaseTest, self).setUp()
if self.conf.sharding:
if self.conf.pooling:
for i in range(4):
uri = self.conf['drivers:storage:mongodb'].uri
doc = {'weight': 100, 'uri': uri}
self.simulate_put(self.url_prefix + '/shards/' + str(i),
self.simulate_put(self.url_prefix + '/pools/' + str(i),
body=jsonutils.dumps(doc))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
@ -61,9 +61,9 @@ class MessagesBaseTest(base.V1Base):
def tearDown(self):
self.simulate_delete(self.queue_path, self.project_id)
if self.conf.sharding:
if self.conf.pooling:
for i in range(4):
self.simulate_delete(self.url_prefix + '/shards/' + str(i))
self.simulate_delete(self.url_prefix + '/pools/' + str(i))
super(MessagesBaseTest, self).tearDown()
@ -473,21 +473,21 @@ class TestMessagesMongoDB(MessagesBaseTest):
super(TestMessagesMongoDB, self).tearDown()
class TestMessagesMongoDBSharded(MessagesBaseTest):
class TestMessagesMongoDBPooled(MessagesBaseTest):
config_file = 'wsgi_mongodb_sharded.conf'
config_file = 'wsgi_mongodb_pooled.conf'
@testing.requires_mongodb
def setUp(self):
super(TestMessagesMongoDBSharded, self).setUp()
super(TestMessagesMongoDBPooled, self).setUp()
def tearDown(self):
super(TestMessagesMongoDBSharded, self).tearDown()
super(TestMessagesMongoDBPooled, self).tearDown()
# TODO(cpp-cabrera): remove this skipTest once sharded queue
# TODO(cpp-cabrera): remove this skipTest once pooled queue
# listing is implemented
def test_list(self):
self.skipTest("Need to implement sharded queue listing.")
self.skipTest("Need to implement pooled queue listing.")
class TestMessagesFaultyDriver(base.V1BaseFaulty):

View File

@ -25,13 +25,13 @@ from marconi.tests.queues.transport.wsgi import base
@contextlib.contextmanager
def shard(test, name, weight, uri, options={}):
"""A context manager for constructing a shard for use in testing.
def pool(test, name, weight, uri, options={}):
"""A context manager for constructing a pool for use in testing.
Deletes the shard after exiting the context.
Deletes the pool after exiting the context.
:param test: Must expose simulate_* methods
:param name: Name for this shard
:param name: Name for this pool
:type name: six.text_type
:type weight: int
:type uri: six.text_type
@ -40,7 +40,7 @@ def shard(test, name, weight, uri, options={}):
:rtype: see above
"""
doc = {'weight': weight, 'uri': uri, 'options': options}
path = test.url_prefix + '/shards/' + name
path = test.url_prefix + '/pools/' + name
test.simulate_put(path, body=jsonutils.dumps(doc))
@ -52,18 +52,18 @@ def shard(test, name, weight, uri, options={}):
@contextlib.contextmanager
def shards(test, count, uri):
"""A context manager for constructing shards for use in testing.
def pools(test, count, uri):
"""A context manager for constructing pools for use in testing.
Deletes the shards after exiting the context.
Deletes the pools after exiting the context.
:param test: Must expose simulate_* methods
:param count: Number of shards to create
:param count: Number of pools to create
:type count: int
:returns: (paths, weights, uris, options)
:rtype: ([six.text_type], [int], [six.text_type], [dict])
"""
base = test.url_prefix + '/shards/'
base = test.url_prefix + '/pools/'
args = [(base + str(i), i,
{str(i): i})
for i in range(count)]
@ -79,28 +79,28 @@ def shards(test, count, uri):
@ddt.ddt
class ShardsBaseTest(base.V1Base):
class PoolsBaseTest(base.V1Base):
def setUp(self):
super(ShardsBaseTest, self).setUp()
super(PoolsBaseTest, self).setUp()
self.doc = {'weight': 100, 'uri': 'sqlite://:memory:'}
self.shard = self.url_prefix + '/shards/' + str(uuid.uuid1())
self.simulate_put(self.shard, body=jsonutils.dumps(self.doc))
self.pool = self.url_prefix + '/pools/' + str(uuid.uuid1())
self.simulate_put(self.pool, body=jsonutils.dumps(self.doc))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
def tearDown(self):
super(ShardsBaseTest, self).tearDown()
self.simulate_delete(self.shard)
super(PoolsBaseTest, self).tearDown()
self.simulate_delete(self.pool)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
def test_put_shard_works(self):
def test_put_pool_works(self):
name = str(uuid.uuid1())
weight, uri = self.doc['weight'], self.doc['uri']
with shard(self, name, weight, uri):
with pool(self, name, weight, uri):
self.assertEqual(self.srmock.status, falcon.HTTP_201)
def test_put_raises_if_missing_fields(self):
path = self.url_prefix + '/shards/' + str(uuid.uuid1())
path = self.url_prefix + '/pools/' + str(uuid.uuid1())
self.simulate_put(path, body=jsonutils.dumps({'weight': 100}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ -111,7 +111,7 @@ class ShardsBaseTest(base.V1Base):
@ddt.data(-1, 2**32+1, 'big')
def test_put_raises_if_invalid_weight(self, weight):
path = self.url_prefix + '/shards/' + str(uuid.uuid1())
path = self.url_prefix + '/pools/' + str(uuid.uuid1())
doc = {'weight': weight, 'uri': 'a'}
self.simulate_put(path,
body=jsonutils.dumps(doc))
@ -119,84 +119,84 @@ class ShardsBaseTest(base.V1Base):
@ddt.data(-1, 2**32+1, [], 'localhost:27017')
def test_put_raises_if_invalid_uri(self, uri):
path = self.url_prefix + '/shards/' + str(uuid.uuid1())
path = self.url_prefix + '/pools/' + str(uuid.uuid1())
self.simulate_put(path,
body=jsonutils.dumps({'weight': 1, 'uri': uri}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 'wee', [])
def test_put_raises_if_invalid_options(self, options):
path = self.url_prefix + '/shards/' + str(uuid.uuid1())
path = self.url_prefix + '/pools/' + str(uuid.uuid1())
doc = {'weight': 1, 'uri': 'a', 'options': options}
self.simulate_put(path, body=jsonutils.dumps(doc))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_put_existing_overwrites(self):
# NOTE(cabrera): setUp creates default shard
# NOTE(cabrera): setUp creates default pool
expect = self.doc
self.simulate_put(self.shard,
self.simulate_put(self.pool,
body=jsonutils.dumps(expect))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
result = self.simulate_get(self.shard)
result = self.simulate_get(self.pool)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
doc = jsonutils.loads(result[0])
self.assertEqual(doc['weight'], expect['weight'])
self.assertEqual(doc['uri'], expect['uri'])
def test_delete_works(self):
self.simulate_delete(self.shard)
self.simulate_delete(self.pool)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
self.simulate_get(self.shard)
self.simulate_get(self.pool)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def test_get_nonexisting_raises_404(self):
self.simulate_get(self.url_prefix + '/shards/nonexisting')
self.simulate_get(self.url_prefix + '/pools/nonexisting')
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def _shard_expect(self, shard, xhref, xweight, xuri):
self.assertIn('href', shard)
self.assertEqual(shard['href'], xhref)
self.assertIn('weight', shard)
self.assertEqual(shard['weight'], xweight)
self.assertIn('uri', shard)
self.assertEqual(shard['uri'], xuri)
def _pool_expect(self, pool, xhref, xweight, xuri):
self.assertIn('href', pool)
self.assertEqual(pool['href'], xhref)
self.assertIn('weight', pool)
self.assertEqual(pool['weight'], xweight)
self.assertIn('uri', pool)
self.assertEqual(pool['uri'], xuri)
def test_get_works(self):
result = self.simulate_get(self.shard)
result = self.simulate_get(self.pool)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
shard = jsonutils.loads(result[0])
self._shard_expect(shard, self.shard, self.doc['weight'],
pool = jsonutils.loads(result[0])
self._pool_expect(pool, self.pool, self.doc['weight'],
self.doc['uri'])
def test_detailed_get_works(self):
result = self.simulate_get(self.shard,
result = self.simulate_get(self.pool,
query_string='?detailed=True')
self.assertEqual(self.srmock.status, falcon.HTTP_200)
shard = jsonutils.loads(result[0])
self._shard_expect(shard, self.shard, self.doc['weight'],
pool = jsonutils.loads(result[0])
self._pool_expect(pool, self.pool, self.doc['weight'],
self.doc['uri'])
self.assertIn('options', shard)
self.assertEqual(shard['options'], {})
self.assertIn('options', pool)
self.assertEqual(pool['options'], {})
def test_patch_raises_if_missing_fields(self):
self.simulate_patch(self.shard,
self.simulate_patch(self.pool,
body=jsonutils.dumps({'location': 1}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def _patch_test(self, doc):
self.simulate_patch(self.shard,
self.simulate_patch(self.pool,
body=jsonutils.dumps(doc))
self.assertEqual(self.srmock.status, falcon.HTTP_200)
result = self.simulate_get(self.shard,
result = self.simulate_get(self.pool,
query_string='?detailed=True')
self.assertEqual(self.srmock.status, falcon.HTTP_200)
shard = jsonutils.loads(result[0])
self._shard_expect(shard, self.shard, doc['weight'],
pool = jsonutils.loads(result[0])
self._pool_expect(pool, self.pool, doc['weight'],
doc['uri'])
self.assertEqual(shard['options'], doc['options'])
self.assertEqual(pool['options'], doc['options'])
def test_patch_works(self):
doc = {'weight': 101, 'uri': 'sqlite://:memory:', 'options': {'a': 1}}
@ -209,60 +209,60 @@ class ShardsBaseTest(base.V1Base):
@ddt.data(-1, 2**32+1, 'big')
def test_patch_raises_400_on_invalid_weight(self, weight):
self.simulate_patch(self.shard,
self.simulate_patch(self.pool,
body=jsonutils.dumps({'weight': weight}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 2**32+1, [], 'localhost:27017')
def test_patch_raises_400_on_invalid_uri(self, uri):
self.simulate_patch(self.shard,
self.simulate_patch(self.pool,
body=jsonutils.dumps({'uri': uri}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 'wee', [])
def test_patch_raises_400_on_invalid_options(self, options):
self.simulate_patch(self.shard,
self.simulate_patch(self.pool,
body=jsonutils.dumps({'options': options}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_patch_raises_404_if_shard_not_found(self):
self.simulate_patch(self.url_prefix + '/shards/notexists',
def test_patch_raises_404_if_pool_not_found(self):
self.simulate_patch(self.url_prefix + '/pools/notexists',
body=jsonutils.dumps({'weight': 1}))
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def test_empty_listing_returns_204(self):
self.simulate_delete(self.shard)
self.simulate_get(self.url_prefix + '/shards')
self.simulate_delete(self.pool)
self.simulate_get(self.url_prefix + '/pools')
self.assertEqual(self.srmock.status, falcon.HTTP_204)
def _listing_test(self, count=10, limit=10,
marker=None, detailed=False):
# NOTE(cpp-cabrera): delete initial shard - it will interfere
# NOTE(cpp-cabrera): delete initial pool - it will interfere
# with listing tests
self.simulate_delete(self.shard)
self.simulate_delete(self.pool)
query = '?limit={0}&detailed={1}'.format(limit, detailed)
if marker:
query += '&marker={2}'.format(marker)
with shards(self, count, self.doc['uri']) as expected:
result = self.simulate_get(self.url_prefix + '/shards',
with pools(self, count, self.doc['uri']) as expected:
result = self.simulate_get(self.url_prefix + '/pools',
query_string=query)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
results = jsonutils.loads(result[0])
self.assertIsInstance(results, dict)
self.assertIn('shards', results)
shard_list = results['shards']
self.assertEqual(len(shard_list), min(limit, count))
for s in shard_list:
self.assertIn('pools', results)
pool_list = results['pools']
self.assertEqual(len(pool_list), min(limit, count))
for s in pool_list:
# NOTE(flwang): It can't assumed that both sqlalchemy and
# mongodb can return query result with the same order. Just
# like the order they're inserted. Actually, sqlalchemy can't
# guarantee that. So we're leveraging the relationship between
# shard weight and the index of shards fixture to get the
# right shard to verify.
# pool weight and the index of pools fixture to get the
# right pool to verify.
expect = expected[s['weight']]
path, weight = expect[:2]
self._shard_expect(s, path, weight, self.doc['uri'])
self._pool_expect(s, path, weight, self.doc['uri'])
if detailed:
self.assertIn('options', s)
self.assertEqual(s['options'], expect[-1])
@ -280,30 +280,30 @@ class ShardsBaseTest(base.V1Base):
self._listing_test(count=15, limit=limit)
def test_listing_marker_is_respected(self):
self.simulate_delete(self.shard)
self.simulate_delete(self.pool)
with shards(self, 10, self.doc['uri']) as expected:
result = self.simulate_get(self.url_prefix + '/shards',
with pools(self, 10, self.doc['uri']) as expected:
result = self.simulate_get(self.url_prefix + '/pools',
query_string='?marker=3')
self.assertEqual(self.srmock.status, falcon.HTTP_200)
shard_list = jsonutils.loads(result[0])['shards']
self.assertEqual(len(shard_list), 6)
pool_list = jsonutils.loads(result[0])['pools']
self.assertEqual(len(pool_list), 6)
path, weight = expected[4][:2]
self._shard_expect(shard_list[0], path, weight, self.doc['uri'])
self._pool_expect(pool_list[0], path, weight, self.doc['uri'])
class TestShardsMongoDB(ShardsBaseTest):
class TestPoolsMongoDB(PoolsBaseTest):
config_file = 'wsgi_mongodb.conf'
@testing.requires_mongodb
def setUp(self):
super(TestShardsMongoDB, self).setUp()
super(TestPoolsMongoDB, self).setUp()
class TestShardsSqlalchemy(ShardsBaseTest):
class TestPoolsSqlalchemy(PoolsBaseTest):
config_file = 'wsgi_sqlalchemy.conf'
def setUp(self):
super(TestShardsSqlalchemy, self).setUp()
super(TestPoolsSqlalchemy, self).setUp()

View File

@ -18,8 +18,8 @@ from marconi.tests.queues.transport.wsgi.v1_1 import test_default_limits
from marconi.tests.queues.transport.wsgi.v1_1 import test_home
from marconi.tests.queues.transport.wsgi.v1_1 import test_media_type
from marconi.tests.queues.transport.wsgi.v1_1 import test_messages
from marconi.tests.queues.transport.wsgi.v1_1 import test_queue_lifecycle as lc
from marconi.tests.queues.transport.wsgi.v1_1 import test_shards
from marconi.tests.queues.transport.wsgi.v1_1 import test_pools
from marconi.tests.queues.transport.wsgi.v1_1 import test_queue_lifecycle as l
TestAuth = test_auth.TestAuth
TestClaimsFaultyDriver = test_claims.TestClaimsFaultyDriver
@ -30,10 +30,10 @@ TestHomeDocument = test_home.TestHomeDocument
TestMediaType = test_media_type.TestMediaType
TestMessagesFaultyDriver = test_messages.TestMessagesFaultyDriver
TestMessagesMongoDB = test_messages.TestMessagesMongoDB
TestMessagesMongoDBSharded = test_messages.TestMessagesMongoDBSharded
TestMessagesMongoDBPooled = test_messages.TestMessagesMongoDBPooled
TestMessagesSqlalchemy = test_messages.TestMessagesSqlalchemy
TestQueueFaultyDriver = lc.TestQueueLifecycleFaultyDriver
TestQueueLifecycleMongoDB = lc.TestQueueLifecycleMongoDB
TestQueueLifecycleSqlalchemy = lc.TestQueueLifecycleSqlalchemy
TestShardsMongoDB = test_shards.TestShardsMongoDB
TestShardsSqlalchemy = test_shards.TestShardsSqlalchemy
TestQueueFaultyDriver = l.TestQueueLifecycleFaultyDriver
TestQueueLifecycleMongoDB = l.TestQueueLifecycleMongoDB
TestQueueLifecycleSqlalchemy = l.TestQueueLifecycleSqlalchemy
TestPoolsMongoDB = test_pools.TestPoolsMongoDB
TestPoolsSqlalchemy = test_pools.TestPoolsSqlalchemy

View File

@ -34,11 +34,11 @@ class MessagesBaseTest(base.V1_1Base):
def setUp(self):
super(MessagesBaseTest, self).setUp()
if self.conf.sharding:
if self.conf.pooling:
for i in range(4):
uri = self.conf['drivers:storage:mongodb'].uri
doc = {'weight': 100, 'uri': uri}
self.simulate_put(self.url_prefix + '/shards/' + str(i),
self.simulate_put(self.url_prefix + '/pools/' + str(i),
body=jsonutils.dumps(doc))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
@ -59,9 +59,9 @@ class MessagesBaseTest(base.V1_1Base):
def tearDown(self):
self.simulate_delete(self.queue_path, headers=self.headers)
if self.conf.sharding:
if self.conf.pooling:
for i in range(4):
self.simulate_delete(self.url_prefix + '/shards/' + str(i),
self.simulate_delete(self.url_prefix + '/pools/' + str(i),
headers=self.headers)
super(MessagesBaseTest, self).tearDown()
@ -480,20 +480,20 @@ class TestMessagesMongoDB(MessagesBaseTest):
super(TestMessagesMongoDB, self).tearDown()
class TestMessagesMongoDBSharded(MessagesBaseTest):
config_file = 'wsgi_mongodb_sharded.conf'
class TestMessagesMongoDBPooled(MessagesBaseTest):
config_file = 'wsgi_mongodb_pooled.conf'
@testing.requires_mongodb
def setUp(self):
super(TestMessagesMongoDBSharded, self).setUp()
super(TestMessagesMongoDBPooled, self).setUp()
def tearDown(self):
super(TestMessagesMongoDBSharded, self).tearDown()
super(TestMessagesMongoDBPooled, self).tearDown()
# TODO(cpp-cabrera): remove this skipTest once sharded queue
# TODO(cpp-cabrera): remove this skipTest once pooled queue
# listing is implemented
def test_list(self):
self.skipTest("Need to implement sharded queue listing.")
self.skipTest("Need to implement pooled queue listing.")
class TestMessagesFaultyDriver(base.V1_1BaseFaulty):

View File

@ -24,13 +24,13 @@ from marconi.tests.queues.transport.wsgi import base
@contextlib.contextmanager
def shard(test, name, weight, uri, options={}):
"""A context manager for constructing a shard for use in testing.
def pool(test, name, weight, uri, options={}):
"""A context manager for constructing a pool for use in testing.
Deletes the shard after exiting the context.
Deletes the pool after exiting the context.
:param test: Must expose simulate_* methods
:param name: Name for this shard
:param name: Name for this pool
:type name: six.text_type
:type weight: int
:type uri: six.text_type
@ -39,7 +39,7 @@ def shard(test, name, weight, uri, options={}):
:rtype: see above
"""
doc = {'weight': weight, 'uri': uri, 'options': options}
path = test.url_prefix + '/shards/' + name
path = test.url_prefix + '/pools/' + name
test.simulate_put(path, body=jsonutils.dumps(doc))
@ -51,18 +51,18 @@ def shard(test, name, weight, uri, options={}):
@contextlib.contextmanager
def shards(test, count, uri):
"""A context manager for constructing shards for use in testing.
def pools(test, count, uri):
"""A context manager for constructing pools for use in testing.
Deletes the shards after exiting the context.
Deletes the pools after exiting the context.
:param test: Must expose simulate_* methods
:param count: Number of shards to create
:param count: Number of pools to create
:type count: int
:returns: (paths, weights, uris, options)
:rtype: ([six.text_type], [int], [six.text_type], [dict])
"""
base = test.url_prefix + '/shards/'
base = test.url_prefix + '/pools/'
args = [(base + str(i), i,
{str(i): i})
for i in range(count)]
@ -78,28 +78,28 @@ def shards(test, count, uri):
@ddt.ddt
class ShardsBaseTest(base.V1_1Base):
class PoolsBaseTest(base.V1_1Base):
def setUp(self):
super(ShardsBaseTest, self).setUp()
super(PoolsBaseTest, self).setUp()
self.doc = {'weight': 100, 'uri': 'sqlite://:memory:'}
self.shard = self.url_prefix + '/shards/' + str(uuid.uuid1())
self.simulate_put(self.shard, body=jsonutils.dumps(self.doc))
self.pool = self.url_prefix + '/pools/' + str(uuid.uuid1())
self.simulate_put(self.pool, body=jsonutils.dumps(self.doc))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
def tearDown(self):
super(ShardsBaseTest, self).tearDown()
self.simulate_delete(self.shard)
super(PoolsBaseTest, self).tearDown()
self.simulate_delete(self.pool)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
def test_put_shard_works(self):
def test_put_pool_works(self):
name = str(uuid.uuid1())
weight, uri = self.doc['weight'], self.doc['uri']
with shard(self, name, weight, uri):
with pool(self, name, weight, uri):
self.assertEqual(self.srmock.status, falcon.HTTP_201)
def test_put_raises_if_missing_fields(self):
path = self.url_prefix + '/shards/' + str(uuid.uuid1())
path = self.url_prefix + '/pools/' + str(uuid.uuid1())
self.simulate_put(path, body=jsonutils.dumps({'weight': 100}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ -110,7 +110,7 @@ class ShardsBaseTest(base.V1_1Base):
@ddt.data(-1, 2**32+1, 'big')
def test_put_raises_if_invalid_weight(self, weight):
path = self.url_prefix + '/shards/' + str(uuid.uuid1())
path = self.url_prefix + '/pools/' + str(uuid.uuid1())
doc = {'weight': weight, 'uri': 'a'}
self.simulate_put(path,
body=jsonutils.dumps(doc))
@ -118,84 +118,84 @@ class ShardsBaseTest(base.V1_1Base):
@ddt.data(-1, 2**32+1, [], 'localhost:27017')
def test_put_raises_if_invalid_uri(self, uri):
path = self.url_prefix + '/shards/' + str(uuid.uuid1())
path = self.url_prefix + '/pools/' + str(uuid.uuid1())
self.simulate_put(path,
body=jsonutils.dumps({'weight': 1, 'uri': uri}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 'wee', [])
def test_put_raises_if_invalid_options(self, options):
path = self.url_prefix + '/shards/' + str(uuid.uuid1())
path = self.url_prefix + '/pools/' + str(uuid.uuid1())
doc = {'weight': 1, 'uri': 'a', 'options': options}
self.simulate_put(path, body=jsonutils.dumps(doc))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_put_existing_overwrites(self):
# NOTE(cabrera): setUp creates default shard
# NOTE(cabrera): setUp creates default pool
expect = self.doc
self.simulate_put(self.shard,
self.simulate_put(self.pool,
body=jsonutils.dumps(expect))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
result = self.simulate_get(self.shard)
result = self.simulate_get(self.pool)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
doc = jsonutils.loads(result[0])
self.assertEqual(doc['weight'], expect['weight'])
self.assertEqual(doc['uri'], expect['uri'])
def test_delete_works(self):
self.simulate_delete(self.shard)
self.simulate_delete(self.pool)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
self.simulate_get(self.shard)
self.simulate_get(self.pool)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def test_get_nonexisting_raises_404(self):
self.simulate_get(self.url_prefix + '/shards/nonexisting')
self.simulate_get(self.url_prefix + '/pools/nonexisting')
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def _shard_expect(self, shard, xhref, xweight, xuri):
self.assertIn('href', shard)
self.assertEqual(shard['href'], xhref)
self.assertIn('weight', shard)
self.assertEqual(shard['weight'], xweight)
self.assertIn('uri', shard)
self.assertEqual(shard['uri'], xuri)
def _pool_expect(self, pool, xhref, xweight, xuri):
self.assertIn('href', pool)
self.assertEqual(pool['href'], xhref)
self.assertIn('weight', pool)
self.assertEqual(pool['weight'], xweight)
self.assertIn('uri', pool)
self.assertEqual(pool['uri'], xuri)
def test_get_works(self):
result = self.simulate_get(self.shard)
result = self.simulate_get(self.pool)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
shard = jsonutils.loads(result[0])
self._shard_expect(shard, self.shard, self.doc['weight'],
pool = jsonutils.loads(result[0])
self._pool_expect(pool, self.pool, self.doc['weight'],
self.doc['uri'])
def test_detailed_get_works(self):
result = self.simulate_get(self.shard,
result = self.simulate_get(self.pool,
query_string='?detailed=True')
self.assertEqual(self.srmock.status, falcon.HTTP_200)
shard = jsonutils.loads(result[0])
self._shard_expect(shard, self.shard, self.doc['weight'],
pool = jsonutils.loads(result[0])
self._pool_expect(pool, self.pool, self.doc['weight'],
self.doc['uri'])
self.assertIn('options', shard)
self.assertEqual(shard['options'], {})
self.assertIn('options', pool)
self.assertEqual(pool['options'], {})
def test_patch_raises_if_missing_fields(self):
self.simulate_patch(self.shard,
self.simulate_patch(self.pool,
body=jsonutils.dumps({'location': 1}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def _patch_test(self, doc):
self.simulate_patch(self.shard,
self.simulate_patch(self.pool,
body=jsonutils.dumps(doc))
self.assertEqual(self.srmock.status, falcon.HTTP_200)
result = self.simulate_get(self.shard,
result = self.simulate_get(self.pool,
query_string='?detailed=True')
self.assertEqual(self.srmock.status, falcon.HTTP_200)
shard = jsonutils.loads(result[0])
self._shard_expect(shard, self.shard, doc['weight'],
pool = jsonutils.loads(result[0])
self._pool_expect(pool, self.pool, doc['weight'],
doc['uri'])
self.assertEqual(shard['options'], doc['options'])
self.assertEqual(pool['options'], doc['options'])
def test_patch_works(self):
doc = {'weight': 101, 'uri': 'sqlite://:memory:', 'options': {'a': 1}}
@ -208,60 +208,60 @@ class ShardsBaseTest(base.V1_1Base):
@ddt.data(-1, 2**32+1, 'big')
def test_patch_raises_400_on_invalid_weight(self, weight):
self.simulate_patch(self.shard,
self.simulate_patch(self.pool,
body=jsonutils.dumps({'weight': weight}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 2**32+1, [], 'localhost:27017')
def test_patch_raises_400_on_invalid_uri(self, uri):
self.simulate_patch(self.shard,
self.simulate_patch(self.pool,
body=jsonutils.dumps({'uri': uri}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.data(-1, 'wee', [])
def test_patch_raises_400_on_invalid_options(self, options):
self.simulate_patch(self.shard,
self.simulate_patch(self.pool,
body=jsonutils.dumps({'options': options}))
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_patch_raises_404_if_shard_not_found(self):
self.simulate_patch(self.url_prefix + '/shards/notexists',
def test_patch_raises_404_if_pool_not_found(self):
self.simulate_patch(self.url_prefix + '/pools/notexists',
body=jsonutils.dumps({'weight': 1}))
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def test_empty_listing_returns_204(self):
self.simulate_delete(self.shard)
self.simulate_get(self.url_prefix + '/shards')
self.simulate_delete(self.pool)
self.simulate_get(self.url_prefix + '/pools')
self.assertEqual(self.srmock.status, falcon.HTTP_204)
def _listing_test(self, count=10, limit=10,
marker=None, detailed=False):
# NOTE(cpp-cabrera): delete initial shard - it will interfere
# NOTE(cpp-cabrera): delete initial pool - it will interfere
# with listing tests
self.simulate_delete(self.shard)
self.simulate_delete(self.pool)
query = '?limit={0}&detailed={1}'.format(limit, detailed)
if marker:
query += '&marker={2}'.format(marker)
with shards(self, count, self.doc['uri']) as expected:
result = self.simulate_get(self.url_prefix + '/shards',
with pools(self, count, self.doc['uri']) as expected:
result = self.simulate_get(self.url_prefix + '/pools',
query_string=query)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
results = jsonutils.loads(result[0])
self.assertIsInstance(results, dict)
self.assertIn('shards', results)
shard_list = results['shards']
self.assertEqual(len(shard_list), min(limit, count))
for s in shard_list:
self.assertIn('pools', results)
pool_list = results['pools']
self.assertEqual(len(pool_list), min(limit, count))
for s in pool_list:
# NOTE(flwang): It can't assumed that both sqlalchemy and
# mongodb can return query result with the same order. Just
# like the order they're inserted. Actually, sqlalchemy can't
# guarantee that. So we're leveraging the relationship between
# shard weight and the index of shards fixture to get the
# right shard to verify.
# pool weight and the index of pools fixture to get the
# right pool to verify.
expect = expected[s['weight']]
path, weight = expect[:2]
self._shard_expect(s, path, weight, self.doc['uri'])
self._pool_expect(s, path, weight, self.doc['uri'])
if detailed:
self.assertIn('options', s)
self.assertEqual(s['options'], expect[-1])
@ -279,30 +279,30 @@ class ShardsBaseTest(base.V1_1Base):
self._listing_test(count=15, limit=limit)
def test_listing_marker_is_respected(self):
self.simulate_delete(self.shard)
self.simulate_delete(self.pool)
with shards(self, 10, self.doc['uri']) as expected:
result = self.simulate_get(self.url_prefix + '/shards',
with pools(self, 10, self.doc['uri']) as expected:
result = self.simulate_get(self.url_prefix + '/pools',
query_string='?marker=3')
self.assertEqual(self.srmock.status, falcon.HTTP_200)
shard_list = jsonutils.loads(result[0])['shards']
self.assertEqual(len(shard_list), 6)
pool_list = jsonutils.loads(result[0])['pools']
self.assertEqual(len(pool_list), 6)
path, weight = expected[4][:2]
self._shard_expect(shard_list[0], path, weight, self.doc['uri'])
self._pool_expect(pool_list[0], path, weight, self.doc['uri'])
class TestShardsMongoDB(ShardsBaseTest):
class TestPoolsMongoDB(PoolsBaseTest):
config_file = 'wsgi_mongodb.conf'
@testing.requires_mongodb
def setUp(self):
super(TestShardsMongoDB, self).setUp()
super(TestPoolsMongoDB, self).setUp()
class TestShardsSqlalchemy(ShardsBaseTest):
class TestPoolsSqlalchemy(PoolsBaseTest):
config_file = 'wsgi_sqlalchemy.conf'
def setUp(self):
super(TestShardsSqlalchemy, self).setUp()
super(TestPoolsSqlalchemy, self).setUp()

View File

@ -1,5 +1,5 @@
[DEFAULT]
sharding = True
pooling = True
[drivers]
transport = wsgi

View File

@ -1,5 +1,5 @@
[DEFAULT]
sharding = True
pooling = True
[drivers]
transport = wsgi

View File

@ -364,16 +364,16 @@ class MongodbClaimTests(base.ClaimControllerTest):
@testing.requires_mongodb
class MongodbShardsTests(base.ShardsControllerTest):
class MongodbPoolsTests(base.PoolsControllerTest):
driver_class = mongodb.ControlDriver
controller_class = controllers.ShardsController
controller_class = controllers.PoolsController
def setUp(self):
super(MongodbShardsTests, self).setUp()
super(MongodbPoolsTests, self).setUp()
self.load_conf('wsgi_mongodb.conf')
def tearDown(self):
super(MongodbShardsTests, self).tearDown()
super(MongodbPoolsTests, self).tearDown()
@testing.requires_mongodb

View File

@ -78,16 +78,16 @@ class SqlalchemyClaimTests(base.ClaimControllerTest):
controller_class = controllers.ClaimController
class SqlalchemyShardsTest(base.ShardsControllerTest):
class SqlalchemyPoolsTest(base.PoolsControllerTest):
driver_class = sqlalchemy.ControlDriver
controller_class = controllers.ShardsController
controller_class = controllers.PoolsController
def setUp(self):
super(SqlalchemyShardsTest, self).setUp()
super(SqlalchemyPoolsTest, self).setUp()
self.load_conf('wsgi_sqlalchemy.conf')
def tearDown(self):
super(SqlalchemyShardsTest, self).tearDown()
super(SqlalchemyPoolsTest, self).tearDown()
class SqlalchemyCatalogueTest(base.CatalogueControllerTest):

View File

@ -17,7 +17,7 @@ import uuid
from oslo.config import cfg
from marconi.openstack.common.cache import cache as oslo_cache
from marconi.queues.storage import sharding
from marconi.queues.storage import pooling
from marconi.queues.storage import sqlalchemy
from marconi.queues.storage import utils
from marconi import tests as testing
@ -25,14 +25,14 @@ from marconi import tests as testing
# TODO(cpp-cabrera): it would be wonderful to refactor this unit test
# so that it could use multiple control storage backends once those
# have shards/catalogue implementations.
# have pools/catalogue implementations.
@testing.requires_mongodb
class ShardCatalogTest(testing.TestBase):
class PoolCatalogTest(testing.TestBase):
config_file = 'wsgi_mongodb_sharded.conf'
config_file = 'wsgi_mongodb_pooled.conf'
def setUp(self):
super(ShardCatalogTest, self).setUp()
super(PoolCatalogTest, self).setUp()
self.conf.register_opts([cfg.StrOpt('storage')],
group='drivers')
@ -41,20 +41,20 @@ class ShardCatalogTest(testing.TestBase):
control_mode=True)
self.catalogue_ctrl = control.catalogue_controller
self.shards_ctrl = control.shards_controller
self.pools_ctrl = control.pools_controller
# NOTE(cpp-cabrera): populate catalogue
self.shard = str(uuid.uuid1())
self.pool = str(uuid.uuid1())
self.queue = str(uuid.uuid1())
self.project = str(uuid.uuid1())
self.shards_ctrl.create(self.shard, 100, 'sqlite://:memory:')
self.catalogue_ctrl.insert(self.project, self.queue, self.shard)
self.catalog = sharding.Catalog(self.conf, cache, control)
self.pools_ctrl.create(self.pool, 100, 'sqlite://:memory:')
self.catalogue_ctrl.insert(self.project, self.queue, self.pool)
self.catalog = pooling.Catalog(self.conf, cache, control)
def tearDown(self):
self.catalogue_ctrl.drop_all()
self.shards_ctrl.drop_all()
super(ShardCatalogTest, self).tearDown()
self.pools_ctrl.drop_all()
super(PoolCatalogTest, self).tearDown()
def test_lookup_loads_correct_driver(self):
storage = self.catalog.lookup(self.queue, self.project)

View File

@ -19,35 +19,35 @@ from oslo.config import cfg
import six
from marconi.openstack.common.cache import cache as oslo_cache
from marconi.queues.storage import sharding
from marconi.queues.storage import pooling
from marconi.queues.storage import utils
from marconi import tests as testing
@testing.requires_mongodb
class ShardQueuesTest(testing.TestBase):
class PoolQueuesTest(testing.TestBase):
def setUp(self):
super(ShardQueuesTest, self).setUp()
conf = self.load_conf('wsgi_mongodb_sharded.conf')
super(PoolQueuesTest, self).setUp()
conf = self.load_conf('wsgi_mongodb_pooled.conf')
conf.register_opts([cfg.StrOpt('storage')],
group='drivers')
cache = oslo_cache.get_cache()
control = utils.load_storage_driver(conf, cache, control_mode=True)
self.shards_ctrl = control.shards_controller
self.driver = sharding.DataDriver(conf, cache, control)
self.pools_ctrl = control.pools_controller
self.driver = pooling.DataDriver(conf, cache, control)
self.controller = self.driver.queue_controller
# fake two shards
# fake two pools
for _ in six.moves.xrange(2):
self.shards_ctrl.create(str(uuid.uuid1()), 100,
self.pools_ctrl.create(str(uuid.uuid1()), 100,
'sqlite://:memory:')
def tearDown(self):
self.shards_ctrl.drop_all()
super(ShardQueuesTest, self).tearDown()
self.pools_ctrl.drop_all()
super(PoolQueuesTest, self).tearDown()
def test_ping(self):
ping = self.driver.is_alive()

View File

@ -61,7 +61,7 @@ class TestMessagesMongoDB(v1.TestMessagesMongoDB):
url_prefix = URL_PREFIX
class TestMessagesMongoDBSharded(v1.TestMessagesMongoDBSharded):
class TestMessagesMongoDBPooled(v1.TestMessagesMongoDBPooled):
url_prefix = URL_PREFIX
@ -81,11 +81,11 @@ class TestQueueLifecycleSqlalchemy(v1.TestQueueLifecycleSqlalchemy):
url_prefix = URL_PREFIX
class TestShardsMongoDB(v1.TestShardsMongoDB):
class TestPoolsMongoDB(v1.TestPoolsMongoDB):
url_prefix = URL_PREFIX
class TestShardsSqlalchemy(v1.TestShardsSqlalchemy):
class TestPoolsSqlalchemy(v1.TestPoolsSqlalchemy):
url_prefix = URL_PREFIX

View File

@ -60,7 +60,7 @@ class TestMessagesMongoDB(v1_1.TestMessagesMongoDB):
url_prefix = URL_PREFIX
class TestMessagesMongoDBSharded(v1_1.TestMessagesMongoDBSharded):
class TestMessagesMongoDBPooled(v1_1.TestMessagesMongoDBPooled):
url_prefix = URL_PREFIX
@ -82,11 +82,11 @@ class TestQueueLifecycleSqlalchemy(v1_1.TestQueueLifecycleSqlalchemy):
url_prefix = URL_PREFIX
class TestShardsMongoDB(v1_1.TestShardsMongoDB):
class TestPoolsMongoDB(v1_1.TestPoolsMongoDB):
url_prefix = URL_PREFIX
class TestShardsSqlalchemy(v1_1.TestShardsSqlalchemy):
class TestPoolsSqlalchemy(v1_1.TestPoolsSqlalchemy):
url_prefix = URL_PREFIX

View File

@ -16,7 +16,7 @@
from marconi.common import errors
from marconi.queues import bootstrap
from marconi.queues.storage import pipeline
from marconi.queues.storage import sharding
from marconi.queues.storage import pooling
from marconi.queues.storage import sqlalchemy
from marconi.queues.transport import wsgi
from marconi.tests import base
@ -39,10 +39,10 @@ class TestBootstrap(base.TestBase):
self.assertIsInstance(bootstrap.storage._storage,
sqlalchemy.DataDriver)
def test_storage_sqlalchemy_sharded(self):
"""Makes sure we can load the shard driver."""
bootstrap = self._bootstrap('wsgi_sqlalchemy_sharded.conf')
self.assertIsInstance(bootstrap.storage._storage, sharding.DataDriver)
def test_storage_sqlalchemy_pooled(self):
"""Makes sure we can load the pool driver."""
bootstrap = self._bootstrap('wsgi_sqlalchemy_pooled.conf')
self.assertIsInstance(bootstrap.storage._storage, pooling.DataDriver)
def test_transport_invalid(self):
bootstrap = self._bootstrap('drivers_transport_invalid.conf')