From 15d55e7601233d5cb07fdb8a98459758b370d769 Mon Sep 17 00:00:00 2001 From: Flavio Percoco Date: Thu, 29 May 2014 15:25:24 +0200 Subject: [PATCH] Rename shards to pool Shard turned out to be a quite overloaded/confusing term to use here, although correct. We decided to rename our shard feature to pool. P.S: I swear I didn't use sed O.O Change-Id: Ic54f29a4da7d7690c9c9210b74876b96f0ae0eac --- .../common/schemas/{shards.py => pools.py} | 2 +- marconi/queues/bootstrap.py | 21 +-- marconi/queues/storage/__init__.py | 4 +- marconi/queues/storage/base.py | 82 ++++----- marconi/queues/storage/errors.py | 16 +- marconi/queues/storage/mongodb/catalogue.py | 18 +- marconi/queues/storage/mongodb/controllers.py | 4 +- marconi/queues/storage/mongodb/driver.py | 8 +- marconi/queues/storage/mongodb/options.py | 3 - .../storage/mongodb/{shards.py => pools.py} | 30 ++-- .../storage/{sharding.py => pooling.py} | 148 ++++++++-------- .../queues/storage/sqlalchemy/catalogue.py | 18 +- .../queues/storage/sqlalchemy/controllers.py | 4 +- marconi/queues/storage/sqlalchemy/driver.py | 4 +- .../sqlalchemy/{shards.py => pools.py} | 50 +++--- marconi/queues/storage/sqlalchemy/tables.py | 14 +- marconi/queues/storage/utils.py | 4 +- .../queues/transport/wsgi/v1_0/__init__.py | 12 +- .../wsgi/v1_0/{shards.py => pools.py} | 74 ++++---- .../queues/transport/wsgi/v1_1/__init__.py | 12 +- .../wsgi/v1_1/{shards.py => pools.py} | 74 ++++---- marconi/tests/faulty_storage.py | 2 +- marconi/tests/helpers.py | 16 +- marconi/tests/queues/storage/base.py | 154 ++++++++--------- marconi/tests/queues/transport/wsgi/base.py | 4 +- .../queues/transport/wsgi/v1/__init__.py | 16 +- .../queues/transport/wsgi/v1/test_messages.py | 20 +-- .../wsgi/v1/{test_shards.py => test_pools.py} | 160 +++++++++--------- .../queues/transport/wsgi/v1_1/__init__.py | 16 +- .../transport/wsgi/v1_1/test_messages.py | 20 +-- .../v1_1/{test_shards.py => test_pools.py} | 160 +++++++++--------- ..._sharded.conf => wsgi_mongodb_pooled.conf} | 2 +- ...arded.conf => wsgi_sqlalchemy_pooled.conf} | 2 +- .../unit/queues/storage/test_impl_mongodb.py | 8 +- .../queues/storage/test_impl_sqlalchemy.py | 8 +- ..._shard_catalog.py => test_pool_catalog.py} | 24 +-- ...st_shard_queues.py => test_pool_queues.py} | 22 +-- tests/unit/queues/transport/wsgi/test_v1_0.py | 6 +- tests/unit/queues/transport/wsgi/test_v1_1.py | 6 +- tests/unit/test_bootstrap.py | 10 +- 40 files changed, 628 insertions(+), 630 deletions(-) rename marconi/common/schemas/{shards.py => pools.py} (96%) rename marconi/queues/storage/mongodb/{shards.py => pools.py} (83%) rename marconi/queues/storage/{sharding.py => pooling.py} (77%) rename marconi/queues/storage/sqlalchemy/{shards.py => pools.py} (74%) rename marconi/queues/transport/wsgi/v1_0/{shards.py => pools.py} (70%) rename marconi/queues/transport/wsgi/v1_1/{shards.py => pools.py} (70%) rename marconi/tests/queues/transport/wsgi/v1/{test_shards.py => test_pools.py} (66%) rename marconi/tests/queues/transport/wsgi/v1_1/{test_shards.py => test_pools.py} (66%) rename tests/etc/{wsgi_mongodb_sharded.conf => wsgi_mongodb_pooled.conf} (89%) rename tests/etc/{wsgi_sqlalchemy_sharded.conf => wsgi_sqlalchemy_pooled.conf} (78%) rename tests/unit/queues/storage/{test_shard_catalog.py => test_pool_catalog.py} (81%) rename tests/unit/queues/storage/{test_shard_queues.py => test_pool_queues.py} (84%) diff --git a/marconi/common/schemas/shards.py b/marconi/common/schemas/pools.py similarity index 96% rename from marconi/common/schemas/shards.py rename to marconi/common/schemas/pools.py index 3b4d9383d..41cf71fe6 100644 --- a/marconi/common/schemas/shards.py +++ b/marconi/common/schemas/pools.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""shards: JSON schema for marconi-queues shards resources.""" +"""pools: JSON schema for marconi-queues pools resources.""" # NOTE(cpp-cabrera): options can be anything. These will be unique to # each storage driver, so we don't perform any further validation at diff --git a/marconi/queues/bootstrap.py b/marconi/queues/bootstrap.py index 5aa59a4b9..71e143cd4 100644 --- a/marconi/queues/bootstrap.py +++ b/marconi/queues/bootstrap.py @@ -21,7 +21,7 @@ from marconi.common import errors from marconi.openstack.common.cache import cache as oslo_cache from marconi.openstack.common import log from marconi.queues.storage import pipeline -from marconi.queues.storage import sharding +from marconi.queues.storage import pooling from marconi.queues.storage import utils as storage_utils from marconi.queues import transport # NOQA @@ -38,13 +38,14 @@ CONF = cfg.CONF CONF.register_cli_opts(_CLI_OPTIONS) _GENERAL_OPTIONS = ( - cfg.BoolOpt('sharding', default=False, - help=('Enable sharding across multiple storage backends. ', - 'If sharding is enabled, the storage driver ', + cfg.BoolOpt('pooling', default=False, + help=('Enable pooling across multiple storage backends. ', + 'If pooling is enabled, the storage driver ', 'configuration is used to determine where the ', - 'catalogue/control plane data is kept.')), + 'catalogue/control plane data is kept.'), + deprecated_opts=[cfg.DeprecatedOpt('pooling')]), cfg.BoolOpt('admin_mode', default=False, - help='Activate endpoints to manage shard registry.'), + help='Activate endpoints to manage pool registry.'), ) _DRIVER_OPTIONS = ( @@ -81,10 +82,10 @@ class Bootstrap(object): def storage(self): LOG.debug(u'Loading storage driver') - if self.conf.sharding: - LOG.debug(u'Storage sharding enabled') - storage_driver = sharding.DataDriver(self.conf, self.cache, - self.control) + if self.conf.pooling: + LOG.debug(u'Storage pooling enabled') + storage_driver = pooling.DataDriver(self.conf, self.cache, + self.control) else: storage_driver = storage_utils.load_storage_driver( self.conf, self.cache) diff --git a/marconi/queues/storage/__init__.py b/marconi/queues/storage/__init__.py index 967d5f07c..1d80f24d5 100644 --- a/marconi/queues/storage/__init__.py +++ b/marconi/queues/storage/__init__.py @@ -23,10 +23,10 @@ CatalogueBase = base.CatalogueBase Claim = base.Claim Message = base.Message Queue = base.Queue -ShardsBase = base.ShardsBase +PoolsBase = base.PoolsBase DEFAULT_QUEUES_PER_PAGE = base.DEFAULT_QUEUES_PER_PAGE DEFAULT_MESSAGES_PER_PAGE = base.DEFAULT_MESSAGES_PER_PAGE -DEFAULT_SHARDS_PER_PAGE = base.DEFAULT_SHARDS_PER_PAGE +DEFAULT_POOLS_PER_PAGE = base.DEFAULT_POOLS_PER_PAGE DEFAULT_MESSAGES_PER_CLAIM = base.DEFAULT_MESSAGES_PER_CLAIM diff --git a/marconi/queues/storage/base.py b/marconi/queues/storage/base.py index eb0a4a178..8a5320388 100644 --- a/marconi/queues/storage/base.py +++ b/marconi/queues/storage/base.py @@ -20,7 +20,7 @@ import six DEFAULT_QUEUES_PER_PAGE = 10 DEFAULT_MESSAGES_PER_PAGE = 10 -DEFAULT_SHARDS_PER_PAGE = 10 +DEFAULT_POOLS_PER_PAGE = 10 DEFAULT_MESSAGES_PER_CLAIM = 10 @@ -48,7 +48,7 @@ class DataDriverBase(DriverBase): core functionality of the system. Connection information and driver-specific options are - loaded from the config file or the shard catalog. + loaded from the config file or the pool catalog. :param conf: Configuration containing options for this driver. :type conf: `oslo.config.ConfigOpts` @@ -89,8 +89,8 @@ class ControlDriverBase(DriverBase): modify aspects of the functionality of the system. This is ideal for administrative purposes. - Allows access to the shard registry through a catalogue and a - shard controller. + Allows access to the pool registry through a catalogue and a + pool controller. :param conf: Configuration containing options for this driver. :type conf: `oslo.config.ConfigOpts` @@ -105,8 +105,8 @@ class ControlDriverBase(DriverBase): raise NotImplementedError @abc.abstractproperty - def shards_controller(self): - """Returns storage's shard management controller.""" + def pools_controller(self): + """Returns storage's pool management controller.""" raise NotImplementedError @@ -390,71 +390,71 @@ class Claim(ControllerBase): @six.add_metaclass(abc.ABCMeta) -class ShardsBase(ControllerBase): - """A controller for managing shards.""" +class PoolsBase(ControllerBase): + """A controller for managing pools.""" @abc.abstractmethod - def list(self, marker=None, limit=DEFAULT_SHARDS_PER_PAGE, + def list(self, marker=None, limit=DEFAULT_POOLS_PER_PAGE, detailed=False): - """Lists all registered shards. + """Lists all registered pools. - :param marker: used to determine which shard to start with + :param marker: used to determine which pool to start with :type marker: six.text_type :param limit: (Default 10) Max number of results to return :type limit: int :param detailed: whether to include options :type detailed: bool - :returns: A list of shards - name, weight, uri + :returns: A list of pools - name, weight, uri :rtype: [{}] """ raise NotImplementedError @abc.abstractmethod def create(self, name, weight, uri, options=None): - """Registers a shard entry. + """Registers a pool entry. - :param name: The name of this shard + :param name: The name of this pool :type name: six.text_type - :param weight: the likelihood that this shard will be used + :param weight: the likelihood that this pool will be used :type weight: int :param uri: A URI that can be used by a storage client - (e.g., pymongo) to access this shard. + (e.g., pymongo) to access this pool. :type uri: six.text_type - :param options: Options used to configure this shard + :param options: Options used to configure this pool :type options: dict """ raise NotImplementedError @abc.abstractmethod def get(self, name, detailed=False): - """Returns a single shard entry. + """Returns a single pool entry. - :param name: The name of this shard + :param name: The name of this pool :type name: six.text_type :param detailed: Should the options data be included? :type detailed: bool - :returns: weight, uri, and options for this shard + :returns: weight, uri, and options for this pool :rtype: {} - :raises: ShardDoesNotExist if not found + :raises: PoolDoesNotExist if not found """ raise NotImplementedError @abc.abstractmethod def exists(self, name): - """Returns a single shard entry. + """Returns a single pool entry. - :param name: The name of this shard + :param name: The name of this pool :type name: six.text_type - :returns: True if the shard exists + :returns: True if the pool exists :rtype: bool """ raise NotImplementedError @abc.abstractmethod def delete(self, name): - """Removes a shard entry. + """Removes a pool entry. - :param name: The name of this shard + :param name: The name of this pool :type name: six.text_type :rtype: None """ @@ -462,19 +462,19 @@ class ShardsBase(ControllerBase): @abc.abstractmethod def update(self, name, **kwargs): - """Updates the weight, uris, and/or options of this shard + """Updates the weight, uris, and/or options of this pool - :param name: Name of the shard + :param name: Name of the pool :type name: text :param kwargs: one of: `uri`, `weight`, `options` :type kwargs: dict - :raises: ShardDoesNotExist + :raises: PoolDoesNotExist """ raise NotImplementedError @abc.abstractmethod def drop_all(self): - """Deletes all shards from storage.""" + """Deletes all pools from storage.""" raise NotImplementedError @@ -482,7 +482,7 @@ class ShardsBase(ControllerBase): class CatalogueBase(ControllerBase): """A controller for managing the catalogue. The catalogue is responsible for maintaining a mapping between project.queue - entries to their shard. + entries to their pool. """ @abc.abstractmethod @@ -493,21 +493,21 @@ class CatalogueBase(ControllerBase): :param project: The project to use when filtering through queue entries. :type project: six.text_type - :returns: [{'project': ..., 'queue': ..., 'shard': ...},] + :returns: [{'project': ..., 'queue': ..., 'pool': ...},] :rtype: [dict] """ raise NotImplementedError @abc.abstractmethod def get(self, project, queue): - """Returns the shard identifier for the queue registered under this + """Returns the pool identifier for the queue registered under this project. :param project: Namespace to search for the given queue :type project: six.text_type :param queue: The name of the queue to search for :type queue: six.text_type - :returns: {'shard': ...} + :returns: {'pool': ...} :rtype: dict :raises: QueueNotMapped """ @@ -526,15 +526,15 @@ class CatalogueBase(ControllerBase): """ @abc.abstractmethod - def insert(self, project, queue, shard): + def insert(self, project, queue, pool): """Creates a new catalogue entry, or updates it if it already existed. :param project: str - Namespace to insert the given queue into :type project: six.text_type :param queue: str - The name of the queue to insert :type queue: six.text_type - :param shard: shard identifier to associate this queue with - :type shard: six.text_type + :param pool: pool identifier to associate this queue with + :type pool: six.text_type """ raise NotImplementedError @@ -550,15 +550,15 @@ class CatalogueBase(ControllerBase): raise NotImplementedError @abc.abstractmethod - def update(self, project, queue, shards=None): - """Updates the shard identifier for this queue + def update(self, project, queue, pools=None): + """Updates the pool identifier for this queue :param project: Namespace to search :type project: six.text_type :param queue: The name of the queue :type queue: six.text_type - :param shards: The name of the shard where this project/queue lives. - :type shards: six.text_type + :param pools: The name of the pool where this project/queue lives. + :type pools: six.text_type :raises: QueueNotMapped """ raise NotImplementedError diff --git a/marconi/queues/storage/errors.py b/marconi/queues/storage/errors.py index 8d9e67e2f..340688361 100644 --- a/marconi/queues/storage/errors.py +++ b/marconi/queues/storage/errors.py @@ -112,7 +112,7 @@ class ClaimDoesNotExist(DoesNotExist): class QueueNotMapped(DoesNotExist): - msg_format = (u'No shard found for ' + msg_format = (u'No pool found for ' u'queue {queue} for project {project}') def __init__(self, queue, project): @@ -127,17 +127,17 @@ class MessageIsClaimedBy(NotPermitted): super(MessageIsClaimedBy, self).__init__(cid=cid, mid=mid) -class ShardDoesNotExist(DoesNotExist): +class PoolDoesNotExist(DoesNotExist): - msg_format = u'Shard {shard} does not exist' + msg_format = u'Pool {pool} does not exist' - def __init__(self, shard): - super(ShardDoesNotExist, self).__init__(shard=shard) + def __init__(self, pool): + super(PoolDoesNotExist, self).__init__(pool=pool) -class NoShardFound(ExceptionBase): +class NoPoolFound(ExceptionBase): - msg_format = u'No shards registered' + msg_format = u'No pools registered' def __init__(self): - super(NoShardFound, self).__init__() + super(NoPoolFound, self).__init__() diff --git a/marconi/queues/storage/mongodb/catalogue.py b/marconi/queues/storage/mongodb/catalogue.py index d223cfaf4..e1b6c55de 100644 --- a/marconi/queues/storage/mongodb/catalogue.py +++ b/marconi/queues/storage/mongodb/catalogue.py @@ -15,11 +15,11 @@ """MongoDB storage controller for the queues catalogue. -Serves to construct an association between a project + queue -> shard +Serves to construct an association between a project + queue -> pool { 'p_q': project_queue :: six.text_type, - 's': shard_identifier :: six.text_type + 's': pool_identifier :: six.text_type } """ @@ -47,10 +47,10 @@ class CatalogueController(base.CatalogueBase): self._col.ensure_index(CATALOGUE_INDEX, unique=True) @utils.raises_conn_error - def _insert(self, project, queue, shard, upsert): + def _insert(self, project, queue, pool, upsert): key = utils.scope_queue_name(queue, project) return self._col.update({PRIMARY_KEY: key}, - {'$set': {'s': shard}}, upsert=upsert) + {'$set': {'s': pool}}, upsert=upsert) @utils.raises_conn_error def list(self, project): @@ -77,18 +77,18 @@ class CatalogueController(base.CatalogueBase): key = utils.scope_queue_name(queue, project) return self._col.find_one({PRIMARY_KEY: key}) is not None - def insert(self, project, queue, shard): + def insert(self, project, queue, pool): # NOTE(cpp-cabrera): _insert handles conn_error - self._insert(project, queue, shard, upsert=True) + self._insert(project, queue, pool, upsert=True) @utils.raises_conn_error def delete(self, project, queue): self._col.remove({PRIMARY_KEY: utils.scope_queue_name(queue, project)}, w=0) - def update(self, project, queue, shard=None): + def update(self, project, queue, pool=None): # NOTE(cpp-cabrera): _insert handles conn_error - res = self._insert(project, queue, shard, upsert=False) + res = self._insert(project, queue, pool, upsert=False) if not res['updatedExisting']: raise errors.QueueNotMapped(project, queue) @@ -104,5 +104,5 @@ def _normalize(entry): return { 'queue': queue, 'project': project, - 'shard': entry['s'] + 'pool': entry['s'] } diff --git a/marconi/queues/storage/mongodb/controllers.py b/marconi/queues/storage/mongodb/controllers.py index 2ab5a1c9a..efbda28b2 100644 --- a/marconi/queues/storage/mongodb/controllers.py +++ b/marconi/queues/storage/mongodb/controllers.py @@ -25,12 +25,12 @@ Field Mappings: from marconi.queues.storage.mongodb import catalogue from marconi.queues.storage.mongodb import claims from marconi.queues.storage.mongodb import messages +from marconi.queues.storage.mongodb import pools from marconi.queues.storage.mongodb import queues -from marconi.queues.storage.mongodb import shards CatalogueController = catalogue.CatalogueController ClaimController = claims.ClaimController MessageController = messages.MessageController QueueController = queues.QueueController -ShardsController = shards.ShardsController +PoolsController = pools.PoolsController diff --git a/marconi/queues/storage/mongodb/driver.py b/marconi/queues/storage/mongodb/driver.py index 2f6c0d00d..5e43a9db2 100644 --- a/marconi/queues/storage/mongodb/driver.py +++ b/marconi/queues/storage/mongodb/driver.py @@ -45,8 +45,8 @@ class DataDriver(storage.DataDriverBase): opts = options.MONGODB_OPTIONS # NOTE(cpp-cabrera): if this data driver is being loaded - # dynamically, as would be the case for a sharded context, - # filter out the options that were given by the shard + # dynamically, as would be the case for a pooled context, + # filter out the options that were given by the pool # catalogue to avoid DuplicateOptErrors. if 'dynamic' in conf: names = conf[options.MONGODB_GROUP].keys() @@ -131,8 +131,8 @@ class ControlDriver(storage.ControlDriverBase): return self.connection[name] @property - def shards_controller(self): - return controllers.ShardsController(self) + def pools_controller(self): + return controllers.PoolsController(self) @property def catalogue_controller(self): diff --git a/marconi/queues/storage/mongodb/options.py b/marconi/queues/storage/mongodb/options.py index 9ef7fdb81..b27e26196 100644 --- a/marconi/queues/storage/mongodb/options.py +++ b/marconi/queues/storage/mongodb/options.py @@ -20,9 +20,6 @@ from oslo.config import cfg MONGODB_OPTIONS = ( cfg.StrOpt('uri', help='Mongodb Connection URI.'), - # Database name - # TODO(kgriffs): Consider local sharding across DBs to mitigate - # per-DB locking latency. cfg.StrOpt('database', default='marconi', help='Database name.'), cfg.IntOpt('partitions', default=2, diff --git a/marconi/queues/storage/mongodb/shards.py b/marconi/queues/storage/mongodb/pools.py similarity index 83% rename from marconi/queues/storage/mongodb/shards.py rename to marconi/queues/storage/mongodb/pools.py index 5f863807c..420b4f168 100644 --- a/marconi/queues/storage/mongodb/shards.py +++ b/marconi/queues/storage/mongodb/pools.py @@ -12,7 +12,7 @@ # License for the specific language governing permissions and limitations under # the License. -"""shards: an implementation of the shard management storage +"""pools: an implementation of the pool management storage controller for mongodb. Schema: @@ -29,7 +29,7 @@ from marconi.queues.storage import base from marconi.queues.storage import errors from marconi.queues.storage.mongodb import utils -SHARDS_INDEX = [ +POOLS_INDEX = [ ('n', 1) ] @@ -42,15 +42,15 @@ def _field_spec(detailed=False): return dict(OMIT_FIELDS + (() if detailed else (('o', False),))) -class ShardsController(base.ShardsBase): +class PoolsController(base.PoolsBase): def __init__(self, *args, **kwargs): - super(ShardsController, self).__init__(*args, **kwargs) + super(PoolsController, self).__init__(*args, **kwargs) - self._col = self.driver.database.shards - self._col.ensure_index(SHARDS_INDEX, + self._col = self.driver.database.pools + self._col.ensure_index(POOLS_INDEX, background=True, - name='shards_name', + name='pools_name', unique=True) @utils.raises_conn_error @@ -69,7 +69,7 @@ class ShardsController(base.ShardsBase): res = self._col.find_one({'n': name}, _field_spec(detailed)) if not res: - raise errors.ShardDoesNotExist(name) + raise errors.PoolDoesNotExist(name) return _normalize(res, detailed) @@ -96,7 +96,7 @@ class ShardsController(base.ShardsBase): {'$set': fields}, upsert=False) if not res['updatedExisting']: - raise errors.ShardDoesNotExist(name) + raise errors.PoolDoesNotExist(name) @utils.raises_conn_error def delete(self, name): @@ -105,16 +105,16 @@ class ShardsController(base.ShardsBase): @utils.raises_conn_error def drop_all(self): self._col.drop() - self._col.ensure_index(SHARDS_INDEX, unique=True) + self._col.ensure_index(POOLS_INDEX, unique=True) -def _normalize(shard, detailed=False): +def _normalize(pool, detailed=False): ret = { - 'name': shard['n'], - 'uri': shard['u'], - 'weight': shard['w'], + 'name': pool['n'], + 'uri': pool['u'], + 'weight': pool['w'], } if detailed: - ret['options'] = shard['o'] + ret['options'] = pool['o'] return ret diff --git a/marconi/queues/storage/sharding.py b/marconi/queues/storage/pooling.py similarity index 77% rename from marconi/queues/storage/sharding.py rename to marconi/queues/storage/pooling.py index 1e23f7e90..31bee4f5c 100644 --- a/marconi/queues/storage/sharding.py +++ b/marconi/queues/storage/pooling.py @@ -31,10 +31,10 @@ _CATALOG_OPTIONS = ( help='Catalog storage driver.'), ) -_CATALOG_GROUP = 'sharding:catalog' +_CATALOG_GROUP = 'pooling:catalog' -# NOTE(kgriffs): E.g.: 'marconi-sharding:5083853/my-queue' -_SHARD_CACHE_PREFIX = 'sharding:' +# NOTE(kgriffs): E.g.: 'marconi-pooling:5083853/my-queue' +_POOL_CACHE_PREFIX = 'pooling:' # TODO(kgriffs): If a queue is migrated, everyone's # caches need to have the relevant entry invalidated @@ -42,58 +42,58 @@ _SHARD_CACHE_PREFIX = 'sharding:' # on the TTL. # # TODO(kgriffs): Make dynamic? -_SHARD_CACHE_TTL = 10 +_POOL_CACHE_TTL = 10 def _config_options(): return [(_CATALOG_GROUP, _CATALOG_OPTIONS)] -def _shard_cache_key(queue, project=None): +def _pool_cache_key(queue, project=None): # NOTE(kgriffs): Use string concatenation for performance, # also put project first since it is guaranteed to be # unique, which should reduce lookup time. - return _SHARD_CACHE_PREFIX + str(project) + '/' + queue + return _POOL_CACHE_PREFIX + str(project) + '/' + queue class DataDriver(storage.DataDriverBase): - """Sharding meta-driver for routing requests to multiple backends. + """Pooling meta-driver for routing requests to multiple backends. - :param conf: Configuration from which to read sharding options + :param conf: Configuration from which to read pooling options :param cache: Cache instance that will be passed to individual - storage driver instances that correspond to each shard. will - also be used by the shard controller to reduce latency for + storage driver instances that correspond to each pool. will + also be used by the pool controller to reduce latency for some operations. """ def __init__(self, conf, cache, control): super(DataDriver, self).__init__(conf, cache) - self._shard_catalog = Catalog(conf, cache, control) + self._pool_catalog = Catalog(conf, cache, control) def is_alive(self): - return all(self._shard_catalog.get_driver(shard['name']).is_alive() - for shard in - self._shard_catalog._shards_ctrl.list(limit=0)) + return all(self._pool_catalog.get_driver(pool['name']).is_alive() + for pool in + self._pool_catalog._pools_ctrl.list(limit=0)) @decorators.lazy_property(write=False) def queue_controller(self): - return QueueController(self._shard_catalog) + return QueueController(self._pool_catalog) @decorators.lazy_property(write=False) def message_controller(self): - return MessageController(self._shard_catalog) + return MessageController(self._pool_catalog) @decorators.lazy_property(write=False) def claim_controller(self): - return ClaimController(self._shard_catalog) + return ClaimController(self._pool_catalog) class RoutingController(storage.base.ControllerBase): - """Routes operations to the appropriate shard. + """Routes operations to the appropriate pool. This controller stands in for a regular storage controller, routing operations to a driver instance that represents - the shard to which the queue has been assigned. + the pool to which the queue has been assigned. Do not instantiate this class directly; use one of the more specific child classes instead. @@ -101,16 +101,16 @@ class RoutingController(storage.base.ControllerBase): _resource_name = None - def __init__(self, shard_catalog): + def __init__(self, pool_catalog): super(RoutingController, self).__init__(None) self._ctrl_property_name = self._resource_name + '_controller' - self._shard_catalog = shard_catalog + self._pool_catalog = pool_catalog @decorators.memoized_getattr def __getattr__(self, name): # NOTE(kgriffs): Use a closure trick to avoid # some attr lookups each time forward() is called. - lookup = self._shard_catalog.lookup + lookup = self._pool_catalog.lookup # NOTE(kgriffs): Assume that every controller method # that is exposed to the transport declares queue name @@ -132,16 +132,16 @@ class QueueController(RoutingController): _resource_name = 'queue' - def __init__(self, shard_catalog): - super(QueueController, self).__init__(shard_catalog) - self._lookup = self._shard_catalog.lookup + def __init__(self, pool_catalog): + super(QueueController, self).__init__(pool_catalog) + self._lookup = self._pool_catalog.lookup def list(self, project=None, marker=None, limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False): def all_pages(): - for shard in self._shard_catalog._shards_ctrl.list(limit=0): - yield next(self._shard_catalog.get_driver(shard['name']) + for pool in self._pool_catalog._pools_ctrl.list(limit=0): + yield next(self._pool_catalog.get_driver(pool['name']) .queue_controller.list( project=project, marker=marker, @@ -166,7 +166,7 @@ class QueueController(RoutingController): yield marker_name['next'] def create(self, name, project=None): - self._shard_catalog.register(name, project) + self._pool_catalog.register(name, project) # NOTE(cpp-cabrera): This should always succeed since we just # registered the project/queue. There is a race condition, @@ -195,7 +195,7 @@ class QueueController(RoutingController): # latter case is more difficult to reason about, and may # yield 500s in some operations. control = target.queue_controller - self._shard_catalog.deregister(name, project) + self._pool_catalog.deregister(name, project) ret = control.delete(name, project) return ret @@ -234,9 +234,9 @@ class QueueController(RoutingController): class MessageController(RoutingController): _resource_name = 'message' - def __init__(self, shard_catalog): - super(MessageController, self).__init__(shard_catalog) - self._lookup = self._shard_catalog.lookup + def __init__(self, pool_catalog): + super(MessageController, self).__init__(pool_catalog) + self._lookup = self._pool_catalog.lookup def post(self, queue, project, messages, client_uuid): target = self._lookup(queue, project) @@ -294,9 +294,9 @@ class MessageController(RoutingController): class ClaimController(RoutingController): _resource_name = 'claim' - def __init__(self, shard_catalog): - super(ClaimController, self).__init__(shard_catalog) - self._lookup = self._shard_catalog.lookup + def __init__(self, pool_catalog): + super(ClaimController, self).__init__(pool_catalog) + self._lookup = self._pool_catalog.lookup def create(self, queue, metadata, project=None, limit=None): target = self._lookup(queue, project) @@ -332,7 +332,7 @@ class ClaimController(RoutingController): class Catalog(object): - """Represents the mapping between queues and shard drivers.""" + """Represents the mapping between queues and pool drivers.""" def __init__(self, conf, cache, control): self._drivers = {} @@ -342,74 +342,74 @@ class Catalog(object): self._conf.register_opts(_CATALOG_OPTIONS, group=_CATALOG_GROUP) self._catalog_conf = self._conf[_CATALOG_GROUP] - self._shards_ctrl = control.shards_controller + self._pools_ctrl = control.pools_controller self._catalogue_ctrl = control.catalogue_controller # FIXME(cpp-cabrera): https://bugs.launchpad.net/marconi/+bug/1252791 - def _init_driver(self, shard_id): - """Given a shard name, returns a storage driver. + def _init_driver(self, pool_id): + """Given a pool name, returns a storage driver. - :param shard_id: The name of a shard. - :type shard_id: six.text_type + :param pool_id: The name of a pool. + :type pool_id: six.text_type :returns: a storage driver :rtype: marconi.queues.storage.base.DataDriver """ - shard = self._shards_ctrl.get(shard_id, detailed=True) - conf = utils.dynamic_conf(shard['uri'], shard['options']) + pool = self._pools_ctrl.get(pool_id, detailed=True) + conf = utils.dynamic_conf(pool['uri'], pool['options']) return utils.load_storage_driver(conf, self._cache) - @decorators.caches(_shard_cache_key, _SHARD_CACHE_TTL) - def _shard_id(self, queue, project=None): - """Get the ID for the shard assigned to the given queue. + @decorators.caches(_pool_cache_key, _POOL_CACHE_TTL) + def _pool_id(self, queue, project=None): + """Get the ID for the pool assigned to the given queue. :param queue: name of the queue :param project: project to which the queue belongs - :returns: shard id + :returns: pool id :raises: `errors.QueueNotMapped` """ - return self._catalogue_ctrl.get(project, queue)['shard'] + return self._catalogue_ctrl.get(project, queue)['pool'] def register(self, queue, project=None): - """Register a new queue in the shard catalog. + """Register a new queue in the pool catalog. This method should be called whenever a new queue is being - created, and will create an entry in the shard catalog for + created, and will create an entry in the pool catalog for the given queue. After using this method to register the queue in the catalog, the caller should call `lookup()` to get a reference to a storage driver which will allow interacting with the - queue's assigned backend shard. + queue's assigned backend pool. - :param queue: Name of the new queue to assign to a shard + :param queue: Name of the new queue to assign to a pool :type queue: six.text_type :param project: Project to which the queue belongs, or None for the "global" or "generic" project. :type project: six.text_type - :raises: NoShardFound + :raises: NoPoolFound """ # NOTE(cpp-cabrera): only register a queue if the entry # doesn't exist if not self._catalogue_ctrl.exists(project, queue): # NOTE(cpp-cabrera): limit=0 implies unlimited - select from - # all shards - shard = select.weighted(self._shards_ctrl.list(limit=0)) + # all pools + pool = select.weighted(self._pools_ctrl.list(limit=0)) - if not shard: - raise errors.NoShardFound() + if not pool: + raise errors.NoPoolFound() - self._catalogue_ctrl.insert(project, queue, shard['name']) + self._catalogue_ctrl.insert(project, queue, pool['name']) - @_shard_id.purges + @_pool_id.purges def deregister(self, queue, project=None): - """Removes a queue from the shard catalog. + """Removes a queue from the pool catalog. Call this method after successfully deleting it from a - backend shard. + backend pool. - :param queue: Name of the new queue to assign to a shard + :param queue: Name of the new queue to assign to a pool :type queue: six.text_type :param project: Project to which the queue belongs, or None for the "global" or "generic" project. @@ -418,20 +418,20 @@ class Catalog(object): self._catalogue_ctrl.delete(project, queue) def lookup(self, queue, project=None): - """Lookup a shard driver for the given queue and project. + """Lookup a pool driver for the given queue and project. - :param queue: Name of the queue for which to find a shard + :param queue: Name of the queue for which to find a pool :param project: Project to which the queue belongs, or None to specify the "global" or "generic" project. - :returns: A storage driver instance for the appropriate shard. If + :returns: A storage driver instance for the appropriate pool. If the driver does not exist yet, it is created and cached. If the queue is not mapped, returns None. :rtype: Maybe DataDriver """ try: - shard_id = self._shard_id(queue, project) + pool_id = self._pool_id(queue, project) except errors.QueueNotMapped as ex: LOG.debug(ex) @@ -441,21 +441,21 @@ class Catalog(object): # the place. return None - return self.get_driver(shard_id) + return self.get_driver(pool_id) - def get_driver(self, shard_id): - """Get storage driver, preferably cached, from a shard name. + def get_driver(self, pool_id): + """Get storage driver, preferably cached, from a pool name. - :param shard_id: The name of a shard. - :type shard_id: six.text_type + :param pool_id: The name of a pool. + :type pool_id: six.text_type :returns: a storage driver :rtype: marconi.queues.storage.base.DataDriver """ try: - return self._drivers[shard_id] + return self._drivers[pool_id] except KeyError: # NOTE(cpp-cabrera): cache storage driver connection - self._drivers[shard_id] = self._init_driver(shard_id) + self._drivers[pool_id] = self._init_driver(pool_id) - return self._drivers[shard_id] + return self._drivers[pool_id] diff --git a/marconi/queues/storage/sqlalchemy/catalogue.py b/marconi/queues/storage/sqlalchemy/catalogue.py index 390553a51..37da407d4 100644 --- a/marconi/queues/storage/sqlalchemy/catalogue.py +++ b/marconi/queues/storage/sqlalchemy/catalogue.py @@ -15,9 +15,9 @@ """Sql storage controller for the queues catalogue. -Serves to construct an association between a project + queue -> shard +Serves to construct an association between a project + queue -> pool -name: string -> Shards.name +name: string -> Pools.name project: string queue: string """ @@ -68,15 +68,15 @@ class CatalogueController(base.CatalogueBase): except errors.QueueNotMapped: return False - def insert(self, project, queue, shard): + def insert(self, project, queue, pool): try: stmt = sa.sql.insert(tables.Catalogue).values( - project=project, queue=queue, shard=shard + project=project, queue=queue, pool=pool ) self._conn.execute(stmt) except sa.exc.IntegrityError: - self.update(project, queue, shard) + self.update(project, queue, pool) def delete(self, project, queue): stmt = sa.sql.delete(tables.Catalogue).where( @@ -84,8 +84,8 @@ class CatalogueController(base.CatalogueBase): ) self._conn.execute(stmt) - def update(self, project, queue, shard=None): - if shard is None: + def update(self, project, queue, pool=None): + if pool is None: return if not self.exists(project, queue): @@ -93,7 +93,7 @@ class CatalogueController(base.CatalogueBase): stmt = sa.sql.update(tables.Catalogue).where( _match(project, queue) - ).values(shard=shard) + ).values(pool=pool) self._conn.execute(stmt) def drop_all(self): @@ -106,5 +106,5 @@ def _normalize(entry): return { 'queue': queue, 'project': project, - 'shard': name + 'pool': name } diff --git a/marconi/queues/storage/sqlalchemy/controllers.py b/marconi/queues/storage/sqlalchemy/controllers.py index ec7c2abfe..07e7407e7 100644 --- a/marconi/queues/storage/sqlalchemy/controllers.py +++ b/marconi/queues/storage/sqlalchemy/controllers.py @@ -16,12 +16,12 @@ from marconi.queues.storage.sqlalchemy import catalogue from marconi.queues.storage.sqlalchemy import claims from marconi.queues.storage.sqlalchemy import messages +from marconi.queues.storage.sqlalchemy import pools from marconi.queues.storage.sqlalchemy import queues -from marconi.queues.storage.sqlalchemy import shards QueueController = queues.QueueController ClaimController = claims.ClaimController MessageController = messages.MessageController CatalogueController = catalogue.CatalogueController -ShardsController = shards.ShardsController +PoolsController = pools.PoolsController diff --git a/marconi/queues/storage/sqlalchemy/driver.py b/marconi/queues/storage/sqlalchemy/driver.py index e75e558d4..f02c0d4b3 100644 --- a/marconi/queues/storage/sqlalchemy/driver.py +++ b/marconi/queues/storage/sqlalchemy/driver.py @@ -140,8 +140,8 @@ class ControlDriver(storage.ControlDriverBase): self.connection.close() @property - def shards_controller(self): - return controllers.ShardsController(self) + def pools_controller(self): + return controllers.PoolsController(self) @property def catalogue_controller(self): diff --git a/marconi/queues/storage/sqlalchemy/shards.py b/marconi/queues/storage/sqlalchemy/pools.py similarity index 74% rename from marconi/queues/storage/sqlalchemy/shards.py rename to marconi/queues/storage/sqlalchemy/pools.py index 6815c4812..bd6be36d2 100644 --- a/marconi/queues/storage/sqlalchemy/shards.py +++ b/marconi/queues/storage/sqlalchemy/pools.py @@ -12,7 +12,7 @@ # License for the specific language governing permissions and limitations under # the License. -"""shards: an implementation of the shard management storage +"""pools: an implementation of the pool management storage controller for sqlalchemy. Schema: @@ -33,10 +33,10 @@ from marconi.queues.storage.sqlalchemy import tables from marconi.queues.storage.sqlalchemy import utils -class ShardsController(base.ShardsBase): +class PoolsController(base.PoolsBase): def __init__(self, *args, **kwargs): - super(ShardsController, self).__init__(*args, **kwargs) + super(PoolsController, self).__init__(*args, **kwargs) self._conn = self.driver.connection @@ -47,8 +47,8 @@ class ShardsController(base.ShardsBase): # TODO(cpp-cabrera): optimization - limit the columns returned # when detailed=False by specifying them in the select() # clause - stmt = sa.sql.select([tables.Shards]).where( - tables.Shards.c.name > marker + stmt = sa.sql.select([tables.Pools]).where( + tables.Pools.c.name > marker ).limit(limit) cursor = self._conn.execute(stmt) @@ -57,15 +57,15 @@ class ShardsController(base.ShardsBase): @utils.raises_conn_error def get(self, name, detailed=False): - stmt = sa.sql.select([tables.Shards]).where( - tables.Shards.c.name == name + stmt = sa.sql.select([tables.Pools]).where( + tables.Pools.c.name == name ) - shard = self._conn.execute(stmt).fetchone() - if shard is None: - raise errors.ShardDoesNotExist(name) + pool = self._conn.execute(stmt).fetchone() + if pool is None: + raise errors.PoolDoesNotExist(name) - return _normalize(shard, detailed) + return _normalize(pool, detailed) # TODO(cpp-cabrera): rename to upsert @utils.raises_conn_error @@ -73,7 +73,7 @@ class ShardsController(base.ShardsBase): opts = None if options is None else utils.json_encode(options) try: - stmt = sa.sql.expression.insert(tables.Shards).values( + stmt = sa.sql.expression.insert(tables.Pools).values( name=name, weight=weight, uri=uri, options=opts ) self._conn.execute(stmt) @@ -86,8 +86,8 @@ class ShardsController(base.ShardsBase): @utils.raises_conn_error def exists(self, name): - stmt = sa.sql.select([tables.Shards.c.name]).where( - tables.Shards.c.name == name + stmt = sa.sql.select([tables.Pools.c.name]).where( + tables.Pools.c.name == name ).limit(1) return self._conn.execute(stmt).fetchone() is not None @@ -105,34 +105,34 @@ class ShardsController(base.ShardsBase): if 'options' in fields: fields['options'] = utils.json_encode(fields['options']) - stmt = sa.sql.update(tables.Shards).where( - tables.Shards.c.name == name).values(**fields) + stmt = sa.sql.update(tables.Pools).where( + tables.Pools.c.name == name).values(**fields) res = self._conn.execute(stmt) if res.rowcount == 0: - raise errors.ShardDoesNotExist(name) + raise errors.PoolDoesNotExist(name) @utils.raises_conn_error def delete(self, name): - stmt = sa.sql.expression.delete(tables.Shards).where( - tables.Shards.c.name == name + stmt = sa.sql.expression.delete(tables.Pools).where( + tables.Pools.c.name == name ) self._conn.execute(stmt) @utils.raises_conn_error def drop_all(self): - stmt = sa.sql.expression.delete(tables.Shards) + stmt = sa.sql.expression.delete(tables.Pools) self._conn.execute(stmt) -def _normalize(shard, detailed=False): +def _normalize(pool, detailed=False): ret = { - 'name': shard[0], - 'uri': shard[1], - 'weight': shard[2], + 'name': pool[0], + 'uri': pool[1], + 'weight': pool[2], } if detailed: - opts = shard[3] + opts = pool[3] ret['options'] = utils.json_decode(opts) if opts else None return ret diff --git a/marconi/queues/storage/sqlalchemy/tables.py b/marconi/queues/storage/sqlalchemy/tables.py index d4081e8ad..e648a576e 100644 --- a/marconi/queues/storage/sqlalchemy/tables.py +++ b/marconi/queues/storage/sqlalchemy/tables.py @@ -57,16 +57,16 @@ Queues = sa.Table('Queues', metadata, ) -Shards = sa.Table('Shards', metadata, - sa.Column('name', sa.String(64), primary_key=True), - sa.Column('uri', sa.String(255), nullable=False), - sa.Column('weight', sa.INTEGER, nullable=False), - sa.Column('options', sa.BINARY)) +Pools = sa.Table('Pools', metadata, + sa.Column('name', sa.String(64), primary_key=True), + sa.Column('uri', sa.String(255), nullable=False), + sa.Column('weight', sa.INTEGER, nullable=False), + sa.Column('options', sa.BINARY)) Catalogue = sa.Table('Catalogue', metadata, - sa.Column('shard', sa.String(64), - sa.ForeignKey('Shards.name', + sa.Column('pool', sa.String(64), + sa.ForeignKey('Pools.name', ondelete='CASCADE'), primary_key=True), sa.Column('project', sa.String(64), nullable=False), diff --git a/marconi/queues/storage/utils.py b/marconi/queues/storage/utils.py index 879f2c85f..aa0492078 100644 --- a/marconi/queues/storage/utils.py +++ b/marconi/queues/storage/utils.py @@ -26,9 +26,9 @@ LOG = log.getLogger(__name__) def dynamic_conf(uri, options): """Given metadata, yields a dynamic configuration. - :param uri: shard location + :param uri: pool location :type uri: six.text_type - :param options: additional shard metadata + :param options: additional pool metadata :type options: dict :returns: Configuration object suitable for constructing storage drivers diff --git a/marconi/queues/transport/wsgi/v1_0/__init__.py b/marconi/queues/transport/wsgi/v1_0/__init__.py index 4bfbaf5cc..3a755c560 100644 --- a/marconi/queues/transport/wsgi/v1_0/__init__.py +++ b/marconi/queues/transport/wsgi/v1_0/__init__.py @@ -17,8 +17,8 @@ from marconi.queues.transport.wsgi.v1_0 import health from marconi.queues.transport.wsgi.v1_0 import homedoc from marconi.queues.transport.wsgi.v1_0 import messages from marconi.queues.transport.wsgi.v1_0 import metadata +from marconi.queues.transport.wsgi.v1_0 import pools from marconi.queues.transport.wsgi.v1_0 import queues -from marconi.queues.transport.wsgi.v1_0 import shards from marconi.queues.transport.wsgi.v1_0 import stats @@ -70,11 +70,11 @@ def public_endpoints(driver): def private_endpoints(driver): - shards_controller = driver._control.shards_controller + pools_controller = driver._control.pools_controller return [ - ('/shards', - shards.Listing(shards_controller)), - ('/shards/{shard}', - shards.Resource(shards_controller)), + ('/pools', + pools.Listing(pools_controller)), + ('/pools/{pool}', + pools.Resource(pools_controller)), ] diff --git a/marconi/queues/transport/wsgi/v1_0/shards.py b/marconi/queues/transport/wsgi/v1_0/pools.py similarity index 70% rename from marconi/queues/transport/wsgi/v1_0/shards.py rename to marconi/queues/transport/wsgi/v1_0/pools.py index 988e3ecf3..d59ac0165 100644 --- a/marconi/queues/transport/wsgi/v1_0/shards.py +++ b/marconi/queues/transport/wsgi/v1_0/pools.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""shards: a resource to handle storage shard management +"""pools: a resource to handle storage pool management -A shard is added by an operator by interacting with the -sharding-related endpoints. When specifying a shard, the +A pool is added by an operator by interacting with the +pooling-related endpoints. When specifying a pool, the following fields are required: { @@ -25,7 +25,7 @@ following fields are required: "uri": string::uri } -Furthermore, depending on the underlying storage type of shard being +Furthermore, depending on the underlying storage type of pool being registered, there is an optional field: { @@ -36,7 +36,7 @@ registered, there is an optional field: import falcon import jsonschema -from marconi.common.schemas import shards as schema +from marconi.common.schemas import pools as schema from marconi.common.transport.wsgi import utils from marconi.common import utils as common_utils from marconi.openstack.common import log @@ -49,15 +49,15 @@ LOG = log.getLogger(__name__) class Listing(object): - """A resource to list registered shards + """A resource to list registered pools - :param shards_controller: means to interact with storage + :param pools_controller: means to interact with storage """ - def __init__(self, shards_controller): - self._ctrl = shards_controller + def __init__(self, pools_controller): + self._ctrl = pools_controller def on_get(self, request, response, project_id): - """Returns a shard listing as objects embedded in an array: + """Returns a pool listing as objects embedded in an array: [ {"href": "", "weight": 100, "uri": ""}, @@ -66,7 +66,7 @@ class Listing(object): :returns: HTTP | [200, 204] """ - LOG.debug(u'LIST shards') + LOG.debug(u'LIST pools') store = {} request.get_param('marker', store=store) @@ -74,11 +74,11 @@ class Listing(object): request.get_param_as_bool('detailed', store=store) results = {} - results['shards'] = list(self._ctrl.list(**store)) - for entry in results['shards']: + results['pools'] = list(self._ctrl.list(**store)) + for entry in results['pools']: entry['href'] = request.path + '/' + entry.pop('name') - if not results['shards']: + if not results['pools']: response.status = falcon.HTTP_204 return @@ -88,12 +88,12 @@ class Listing(object): class Resource(object): - """A handler for individual shard. + """A handler for individual pool. - :param shards_controller: means to interact with storage + :param pools_controller: means to interact with storage """ - def __init__(self, shards_controller): - self._ctrl = shards_controller + def __init__(self, pools_controller): + self._ctrl = pools_controller validator_type = jsonschema.Draft4Validator self._validators = { 'weight': validator_type(schema.patch_weight), @@ -102,21 +102,21 @@ class Resource(object): 'create': validator_type(schema.create) } - def on_get(self, request, response, project_id, shard): - """Returns a JSON object for a single shard entry: + def on_get(self, request, response, project_id, pool): + """Returns a JSON object for a single pool entry: {"weight": 100, "uri": "", options: {...}} :returns: HTTP | [200, 404] """ - LOG.debug(u'GET shard - name: %s', shard) + LOG.debug(u'GET pool - name: %s', pool) data = None detailed = request.get_param_as_bool('detailed') or False try: - data = self._ctrl.get(shard, detailed) + data = self._ctrl.get(pool, detailed) - except errors.ShardDoesNotExist as ex: + except errors.PoolDoesNotExist as ex: LOG.debug(ex) raise falcon.HTTPNotFound() @@ -127,8 +127,8 @@ class Resource(object): response.body = transport_utils.to_json(data) response.content_location = request.relative_uri - def on_put(self, request, response, project_id, shard): - """Registers a new shard. Expects the following input: + def on_put(self, request, response, project_id, pool): + """Registers a new pool. Expects the following input: {"weight": 100, "uri": ""} @@ -136,7 +136,7 @@ class Resource(object): :returns: HTTP | [201, 204] """ - LOG.debug(u'PUT shard - name: %s', shard) + LOG.debug(u'PUT pool - name: %s', pool) data = utils.load(request) utils.validate(self._validators['create'], data) @@ -144,23 +144,23 @@ class Resource(object): raise wsgi_errors.HTTPBadRequestBody( 'cannot connect to %s' % data['uri'] ) - self._ctrl.create(shard, weight=data['weight'], + self._ctrl.create(pool, weight=data['weight'], uri=data['uri'], options=data.get('options', {})) response.status = falcon.HTTP_201 response.location = request.path - def on_delete(self, request, response, project_id, shard): - """Deregisters a shard. + def on_delete(self, request, response, project_id, pool): + """Deregisters a pool. :returns: HTTP | 204 """ - LOG.debug(u'DELETE shard - name: %s', shard) - self._ctrl.delete(shard) + LOG.debug(u'DELETE pool - name: %s', pool) + self._ctrl.delete(pool) response.status = falcon.HTTP_204 - def on_patch(self, request, response, project_id, shard): - """Allows one to update a shard's weight, uri, and/or options. + def on_patch(self, request, response, project_id, pool): + """Allows one to update a pool's weight, uri, and/or options. This method expects the user to submit a JSON object containing at least one of: 'uri', 'weight', 'options'. If @@ -171,12 +171,12 @@ class Resource(object): :returns: HTTP | 200,400 """ - LOG.debug(u'PATCH shard - name: %s', shard) + LOG.debug(u'PATCH pool - name: %s', pool) data = utils.load(request) EXPECT = ('weight', 'uri', 'options') if not any([(field in data) for field in EXPECT]): - LOG.debug(u'PATCH shard, bad params') + LOG.debug(u'PATCH pool, bad params') raise wsgi_errors.HTTPBadRequestBody( 'One of `uri`, `weight`, or `options` needs ' 'to be specified' @@ -193,7 +193,7 @@ class Resource(object): pred=lambda v: v is not None) try: - self._ctrl.update(shard, **fields) - except errors.ShardDoesNotExist as ex: + self._ctrl.update(pool, **fields) + except errors.PoolDoesNotExist as ex: LOG.exception(ex) raise falcon.HTTPNotFound() diff --git a/marconi/queues/transport/wsgi/v1_1/__init__.py b/marconi/queues/transport/wsgi/v1_1/__init__.py index d210406d4..97b7c35b1 100644 --- a/marconi/queues/transport/wsgi/v1_1/__init__.py +++ b/marconi/queues/transport/wsgi/v1_1/__init__.py @@ -18,8 +18,8 @@ from marconi.queues.transport.wsgi.v1_1 import homedoc from marconi.queues.transport.wsgi.v1_1 import messages from marconi.queues.transport.wsgi.v1_1 import metadata from marconi.queues.transport.wsgi.v1_1 import ping +from marconi.queues.transport.wsgi.v1_1 import pools from marconi.queues.transport.wsgi.v1_1 import queues -from marconi.queues.transport.wsgi.v1_1 import shards from marconi.queues.transport.wsgi.v1_1 import stats @@ -75,11 +75,11 @@ def public_endpoints(driver): def private_endpoints(driver): - shards_controller = driver._control.shards_controller + pools_controller = driver._control.pools_controller return [ - ('/shards', - shards.Listing(shards_controller)), - ('/shards/{shard}', - shards.Resource(shards_controller)), + ('/pools', + pools.Listing(pools_controller)), + ('/pools/{pool}', + pools.Resource(pools_controller)), ] diff --git a/marconi/queues/transport/wsgi/v1_1/shards.py b/marconi/queues/transport/wsgi/v1_1/pools.py similarity index 70% rename from marconi/queues/transport/wsgi/v1_1/shards.py rename to marconi/queues/transport/wsgi/v1_1/pools.py index 988e3ecf3..d59ac0165 100644 --- a/marconi/queues/transport/wsgi/v1_1/shards.py +++ b/marconi/queues/transport/wsgi/v1_1/pools.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""shards: a resource to handle storage shard management +"""pools: a resource to handle storage pool management -A shard is added by an operator by interacting with the -sharding-related endpoints. When specifying a shard, the +A pool is added by an operator by interacting with the +pooling-related endpoints. When specifying a pool, the following fields are required: { @@ -25,7 +25,7 @@ following fields are required: "uri": string::uri } -Furthermore, depending on the underlying storage type of shard being +Furthermore, depending on the underlying storage type of pool being registered, there is an optional field: { @@ -36,7 +36,7 @@ registered, there is an optional field: import falcon import jsonschema -from marconi.common.schemas import shards as schema +from marconi.common.schemas import pools as schema from marconi.common.transport.wsgi import utils from marconi.common import utils as common_utils from marconi.openstack.common import log @@ -49,15 +49,15 @@ LOG = log.getLogger(__name__) class Listing(object): - """A resource to list registered shards + """A resource to list registered pools - :param shards_controller: means to interact with storage + :param pools_controller: means to interact with storage """ - def __init__(self, shards_controller): - self._ctrl = shards_controller + def __init__(self, pools_controller): + self._ctrl = pools_controller def on_get(self, request, response, project_id): - """Returns a shard listing as objects embedded in an array: + """Returns a pool listing as objects embedded in an array: [ {"href": "", "weight": 100, "uri": ""}, @@ -66,7 +66,7 @@ class Listing(object): :returns: HTTP | [200, 204] """ - LOG.debug(u'LIST shards') + LOG.debug(u'LIST pools') store = {} request.get_param('marker', store=store) @@ -74,11 +74,11 @@ class Listing(object): request.get_param_as_bool('detailed', store=store) results = {} - results['shards'] = list(self._ctrl.list(**store)) - for entry in results['shards']: + results['pools'] = list(self._ctrl.list(**store)) + for entry in results['pools']: entry['href'] = request.path + '/' + entry.pop('name') - if not results['shards']: + if not results['pools']: response.status = falcon.HTTP_204 return @@ -88,12 +88,12 @@ class Listing(object): class Resource(object): - """A handler for individual shard. + """A handler for individual pool. - :param shards_controller: means to interact with storage + :param pools_controller: means to interact with storage """ - def __init__(self, shards_controller): - self._ctrl = shards_controller + def __init__(self, pools_controller): + self._ctrl = pools_controller validator_type = jsonschema.Draft4Validator self._validators = { 'weight': validator_type(schema.patch_weight), @@ -102,21 +102,21 @@ class Resource(object): 'create': validator_type(schema.create) } - def on_get(self, request, response, project_id, shard): - """Returns a JSON object for a single shard entry: + def on_get(self, request, response, project_id, pool): + """Returns a JSON object for a single pool entry: {"weight": 100, "uri": "", options: {...}} :returns: HTTP | [200, 404] """ - LOG.debug(u'GET shard - name: %s', shard) + LOG.debug(u'GET pool - name: %s', pool) data = None detailed = request.get_param_as_bool('detailed') or False try: - data = self._ctrl.get(shard, detailed) + data = self._ctrl.get(pool, detailed) - except errors.ShardDoesNotExist as ex: + except errors.PoolDoesNotExist as ex: LOG.debug(ex) raise falcon.HTTPNotFound() @@ -127,8 +127,8 @@ class Resource(object): response.body = transport_utils.to_json(data) response.content_location = request.relative_uri - def on_put(self, request, response, project_id, shard): - """Registers a new shard. Expects the following input: + def on_put(self, request, response, project_id, pool): + """Registers a new pool. Expects the following input: {"weight": 100, "uri": ""} @@ -136,7 +136,7 @@ class Resource(object): :returns: HTTP | [201, 204] """ - LOG.debug(u'PUT shard - name: %s', shard) + LOG.debug(u'PUT pool - name: %s', pool) data = utils.load(request) utils.validate(self._validators['create'], data) @@ -144,23 +144,23 @@ class Resource(object): raise wsgi_errors.HTTPBadRequestBody( 'cannot connect to %s' % data['uri'] ) - self._ctrl.create(shard, weight=data['weight'], + self._ctrl.create(pool, weight=data['weight'], uri=data['uri'], options=data.get('options', {})) response.status = falcon.HTTP_201 response.location = request.path - def on_delete(self, request, response, project_id, shard): - """Deregisters a shard. + def on_delete(self, request, response, project_id, pool): + """Deregisters a pool. :returns: HTTP | 204 """ - LOG.debug(u'DELETE shard - name: %s', shard) - self._ctrl.delete(shard) + LOG.debug(u'DELETE pool - name: %s', pool) + self._ctrl.delete(pool) response.status = falcon.HTTP_204 - def on_patch(self, request, response, project_id, shard): - """Allows one to update a shard's weight, uri, and/or options. + def on_patch(self, request, response, project_id, pool): + """Allows one to update a pool's weight, uri, and/or options. This method expects the user to submit a JSON object containing at least one of: 'uri', 'weight', 'options'. If @@ -171,12 +171,12 @@ class Resource(object): :returns: HTTP | 200,400 """ - LOG.debug(u'PATCH shard - name: %s', shard) + LOG.debug(u'PATCH pool - name: %s', pool) data = utils.load(request) EXPECT = ('weight', 'uri', 'options') if not any([(field in data) for field in EXPECT]): - LOG.debug(u'PATCH shard, bad params') + LOG.debug(u'PATCH pool, bad params') raise wsgi_errors.HTTPBadRequestBody( 'One of `uri`, `weight`, or `options` needs ' 'to be specified' @@ -193,7 +193,7 @@ class Resource(object): pred=lambda v: v is not None) try: - self._ctrl.update(shard, **fields) - except errors.ShardDoesNotExist as ex: + self._ctrl.update(pool, **fields) + except errors.PoolDoesNotExist as ex: LOG.exception(ex) raise falcon.HTTPNotFound() diff --git a/marconi/tests/faulty_storage.py b/marconi/tests/faulty_storage.py index 083e19f9f..5cc2760f3 100644 --- a/marconi/tests/faulty_storage.py +++ b/marconi/tests/faulty_storage.py @@ -50,7 +50,7 @@ class ControlDriver(storage.ControlDriverBase): return None @property - def shards_controller(self): + def pools_controller(self): return None diff --git a/marconi/tests/helpers.py b/marconi/tests/helpers.py index c009db5b3..58d59cfb1 100644 --- a/marconi/tests/helpers.py +++ b/marconi/tests/helpers.py @@ -133,7 +133,7 @@ def entries(controller, count): @contextlib.contextmanager -def shard_entry(controller, project, queue, shard): +def pool_entry(controller, project, queue, pool): """Creates a catalogue entry with the given details, and deletes it once the context manager goes out of scope. @@ -143,18 +143,18 @@ def shard_entry(controller, project, queue, shard): :type project: six.text_type :param queue: name of queue :type queue: six.text_type - :param shard: an identifier for the shard - :type shard: six.text_type - :returns: (project, queue, shard) + :param pool: an identifier for the pool + :type pool: six.text_type + :returns: (project, queue, pool) :rtype: (six.text_type, six.text_type, six.text_type) """ - controller.insert(project, queue, shard) - yield (project, queue, shard) + controller.insert(project, queue, pool) + yield (project, queue, pool) controller.delete(project, queue) @contextlib.contextmanager -def shard_entries(controller, count): +def pool_entries(controller, count): """Creates `count` catalogue entries with the given details, and deletes them once the context manager goes out of scope. @@ -162,7 +162,7 @@ def shard_entries(controller, count): :type controller: queues.storage.base:CatalogueBase :param count: number of entries to create :type count: int - :returns: [(project, queue, shard)] + :returns: [(project, queue, pool)] :rtype: [(six.text_type, six.text_type, six.text_type)] """ spec = [(u'_', six.text_type(uuid.uuid1()), six.text_type(i)) diff --git a/marconi/tests/queues/storage/base.py b/marconi/tests/queues/storage/base.py index f99d33bcc..45bb43864 100644 --- a/marconi/tests/queues/storage/base.py +++ b/marconi/tests/queues/storage/base.py @@ -646,115 +646,115 @@ class ClaimControllerTest(ControllerBaseTest): project=self.project) -class ShardsControllerTest(ControllerBaseTest): - """Shards Controller base tests. +class PoolsControllerTest(ControllerBaseTest): + """Pools Controller base tests. NOTE(flaper87): Implementations of this class should override the tearDown method in order to clean up storage's state. """ - controller_base_class = storage.ShardsBase + controller_base_class = storage.PoolsBase def setUp(self): - super(ShardsControllerTest, self).setUp() - self.shards_controller = self.driver.shards_controller + super(PoolsControllerTest, self).setUp() + self.pools_controller = self.driver.pools_controller - # Let's create one shard - self.shard = str(uuid.uuid1()) - self.shards_controller.create(self.shard, 100, 'localhost', {}) + # Let's create one pool + self.pool = str(uuid.uuid1()) + self.pools_controller.create(self.pool, 100, 'localhost', {}) def tearDown(self): - self.shards_controller.drop_all() - super(ShardsControllerTest, self).tearDown() + self.pools_controller.drop_all() + super(PoolsControllerTest, self).tearDown() def test_create_succeeds(self): - self.shards_controller.create(str(uuid.uuid1()), - 100, 'localhost', {}) + self.pools_controller.create(str(uuid.uuid1()), + 100, 'localhost', {}) def test_create_replaces_on_duplicate_insert(self): name = str(uuid.uuid1()) - self.shards_controller.create(name, - 100, 'localhost', {}) - self.shards_controller.create(name, - 111, 'localhost2', {}) - entry = self.shards_controller.get(name) - self._shard_expects(entry, xname=name, xweight=111, - xlocation='localhost2') + self.pools_controller.create(name, + 100, 'localhost', {}) + self.pools_controller.create(name, + 111, 'localhost2', {}) + entry = self.pools_controller.get(name) + self._pool_expects(entry, xname=name, xweight=111, + xlocation='localhost2') - def _shard_expects(self, shard, xname, xweight, xlocation): - self.assertIn('name', shard) - self.assertEqual(shard['name'], xname) - self.assertIn('weight', shard) - self.assertEqual(shard['weight'], xweight) - self.assertIn('uri', shard) - self.assertEqual(shard['uri'], xlocation) + def _pool_expects(self, pool, xname, xweight, xlocation): + self.assertIn('name', pool) + self.assertEqual(pool['name'], xname) + self.assertIn('weight', pool) + self.assertEqual(pool['weight'], xweight) + self.assertIn('uri', pool) + self.assertEqual(pool['uri'], xlocation) def test_get_returns_expected_content(self): - res = self.shards_controller.get(self.shard) - self._shard_expects(res, self.shard, 100, 'localhost') + res = self.pools_controller.get(self.pool) + self._pool_expects(res, self.pool, 100, 'localhost') self.assertNotIn('options', res) def test_detailed_get_returns_expected_content(self): - res = self.shards_controller.get(self.shard, detailed=True) + res = self.pools_controller.get(self.pool, detailed=True) self.assertIn('options', res) self.assertEqual(res['options'], {}) def test_get_raises_if_not_found(self): - self.assertRaises(storage.errors.ShardDoesNotExist, - self.shards_controller.get, 'notexists') + self.assertRaises(storage.errors.PoolDoesNotExist, + self.pools_controller.get, 'notexists') def test_exists(self): - self.assertTrue(self.shards_controller.exists(self.shard)) - self.assertFalse(self.shards_controller.exists('notexists')) + self.assertTrue(self.pools_controller.exists(self.pool)) + self.assertFalse(self.pools_controller.exists('notexists')) def test_update_raises_assertion_error_on_bad_fields(self): - self.assertRaises(AssertionError, self.shards_controller.update, - self.shard) + self.assertRaises(AssertionError, self.pools_controller.update, + self.pool) def test_update_works(self): - self.shards_controller.update(self.shard, weight=101, - uri='redis://localhost', - options={'a': 1}) - res = self.shards_controller.get(self.shard, detailed=True) - self._shard_expects(res, self.shard, 101, 'redis://localhost') + self.pools_controller.update(self.pool, weight=101, + uri='redis://localhost', + options={'a': 1}) + res = self.pools_controller.get(self.pool, detailed=True) + self._pool_expects(res, self.pool, 101, 'redis://localhost') self.assertEqual(res['options'], {'a': 1}) def test_delete_works(self): - self.shards_controller.delete(self.shard) - self.assertFalse(self.shards_controller.exists(self.shard)) + self.pools_controller.delete(self.pool) + self.assertFalse(self.pools_controller.exists(self.pool)) def test_delete_nonexistent_is_silent(self): - self.shards_controller.delete('nonexisting') + self.pools_controller.delete('nonexisting') def test_drop_all_leads_to_empty_listing(self): - self.shards_controller.drop_all() - cursor = self.shards_controller.list() + self.pools_controller.drop_all() + cursor = self.pools_controller.list() self.assertRaises(StopIteration, next, cursor) def test_listing_simple(self): # NOTE(cpp-cabrera): base entry interferes with listing results - self.shards_controller.delete(self.shard) + self.pools_controller.delete(self.pool) name_gen = lambda i: chr(ord('A') + i) for i in range(15): - self.shards_controller.create(name_gen(i), i, str(i), {}) + self.pools_controller.create(name_gen(i), i, str(i), {}) - res = list(self.shards_controller.list()) + res = list(self.pools_controller.list()) self.assertEqual(len(res), 10) for i, entry in enumerate(res): - self._shard_expects(entry, name_gen(i), i, str(i)) + self._pool_expects(entry, name_gen(i), i, str(i)) self.assertNotIn('options', entry) - res = list(self.shards_controller.list(limit=5)) + res = list(self.pools_controller.list(limit=5)) self.assertEqual(len(res), 5) - res = next(self.shards_controller.list(marker=name_gen(3))) - self._shard_expects(res, name_gen(4), 4, '4') + res = next(self.pools_controller.list(marker=name_gen(3))) + self._pool_expects(res, name_gen(4), 4, '4') - res = list(self.shards_controller.list(detailed=True)) + res = list(self.pools_controller.list(detailed=True)) self.assertEqual(len(res), 10) for i, entry in enumerate(res): - self._shard_expects(entry, name_gen(i), i, str(i)) + self._pool_expects(entry, name_gen(i), i, str(i)) self.assertIn('options', entry) self.assertEqual(entry['options'], {}) @@ -775,15 +775,15 @@ class CatalogueControllerTest(ControllerBaseTest): def _check_structure(self, entry): self.assertIn('queue', entry) self.assertIn('project', entry) - self.assertIn('shard', entry) + self.assertIn('pool', entry) self.assertIsInstance(entry['queue'], six.text_type) self.assertIsInstance(entry['project'], six.text_type) - self.assertIsInstance(entry['shard'], six.text_type) + self.assertIsInstance(entry['pool'], six.text_type) - def _check_value(self, entry, xqueue, xproject, xshard): + def _check_value(self, entry, xqueue, xproject, xpool): self.assertEqual(entry['queue'], xqueue) self.assertEqual(entry['project'], xproject) - self.assertEqual(entry['shard'], xshard) + self.assertEqual(entry['pool'], xpool) def test_catalogue_entry_life_cycle(self): queue = self.queue @@ -794,13 +794,13 @@ class CatalogueControllerTest(ControllerBaseTest): self.fail('There should be no entries at this time') # create a listing, check its length - with helpers.shard_entries(self.controller, 10) as expect: + with helpers.pool_entries(self.controller, 10) as expect: project = expect[0][0] xs = list(self.controller.list(project)) self.assertEqual(len(xs), 10) # create, check existence, delete - with helpers.shard_entry(self.controller, project, queue, u'a'): + with helpers.pool_entry(self.controller, project, queue, u'a'): self.assertTrue(self.controller.exists(project, queue)) # verify it no longer exists @@ -810,20 +810,20 @@ class CatalogueControllerTest(ControllerBaseTest): self.assertEqual(len(list(self.controller.list(project))), 0) def test_list(self): - with helpers.shard_entries(self.controller, 10) as expect: + with helpers.pool_entries(self.controller, 10) as expect: values = zip(self.controller.list(u'_'), expect) for e, x in values: p, q, s = x self._check_structure(e) - self._check_value(e, xqueue=q, xproject=p, xshard=s) + self._check_value(e, xqueue=q, xproject=p, xpool=s) def test_update(self): - with helpers.shard_entry(self.controller, self.project, - self.queue, u'a') as expect: + with helpers.pool_entry(self.controller, self.project, + self.queue, u'a') as expect: p, q, s = expect - self.controller.update(p, q, shard=u'b') + self.controller.update(p, q, pool=u'b') entry = self.controller.get(p, q) - self._check_value(entry, xqueue=q, xproject=p, xshard=u'b') + self._check_value(entry, xqueue=q, xproject=p, xpool=u'b') def test_update_raises_when_entry_does_not_exist(self): self.assertRaises(errors.QueueNotMapped, @@ -831,17 +831,17 @@ class CatalogueControllerTest(ControllerBaseTest): 'not', 'not', 'a') def test_get(self): - with helpers.shard_entry(self.controller, - self.project, - self.queue, u'a') as expect: + with helpers.pool_entry(self.controller, + self.project, + self.queue, u'a') as expect: p, q, s = expect e = self.controller.get(p, q) - self._check_value(e, xqueue=q, xproject=p, xshard=s) + self._check_value(e, xqueue=q, xproject=p, xpool=s) def test_get_raises_if_does_not_exist(self): - with helpers.shard_entry(self.controller, - self.project, - self.queue, u'a') as expect: + with helpers.pool_entry(self.controller, + self.project, + self.queue, u'a') as expect: p, q, _ = expect self.assertRaises(errors.QueueNotMapped, self.controller.get, @@ -854,12 +854,12 @@ class CatalogueControllerTest(ControllerBaseTest): 'non_existing', 'non_existing') def test_exists(self): - with helpers.shard_entry(self.controller, - self.project, - self.queue, u'a') as expect: + with helpers.pool_entry(self.controller, + self.project, + self.queue, u'a') as expect: p, q, _ = expect self.assertTrue(self.controller.exists(p, q)) - self.assertFalse(self.controller.exists('nada', 'not_here')) + self.assertFalse(self.controller.exists('nada', 'not_here')) def _insert_fixtures(controller, queue_name, project=None, diff --git a/marconi/tests/queues/transport/wsgi/base.py b/marconi/tests/queues/transport/wsgi/base.py index 0645ba3c0..09b73ffeb 100644 --- a/marconi/tests/queues/transport/wsgi/base.py +++ b/marconi/tests/queues/transport/wsgi/base.py @@ -47,8 +47,8 @@ class TestBase(testing.TestBase): self.srmock = ftest.StartResponseMock() def tearDown(self): - if self.conf.sharding: - self.boot.control.shards_controller.drop_all() + if self.conf.pooling: + self.boot.control.pools_controller.drop_all() self.boot.control.catalogue_controller.drop_all() super(TestBase, self).tearDown() diff --git a/marconi/tests/queues/transport/wsgi/v1/__init__.py b/marconi/tests/queues/transport/wsgi/v1/__init__.py index e8ccd107c..b421a15c0 100644 --- a/marconi/tests/queues/transport/wsgi/v1/__init__.py +++ b/marconi/tests/queues/transport/wsgi/v1/__init__.py @@ -18,8 +18,8 @@ from marconi.tests.queues.transport.wsgi.v1 import test_default_limits from marconi.tests.queues.transport.wsgi.v1 import test_home from marconi.tests.queues.transport.wsgi.v1 import test_media_type from marconi.tests.queues.transport.wsgi.v1 import test_messages -from marconi.tests.queues.transport.wsgi.v1 import test_queue_lifecycle as lc -from marconi.tests.queues.transport.wsgi.v1 import test_shards +from marconi.tests.queues.transport.wsgi.v1 import test_pools +from marconi.tests.queues.transport.wsgi.v1 import test_queue_lifecycle as l TestAuth = test_auth.TestAuth TestClaimsFaultyDriver = test_claims.TestClaimsFaultyDriver @@ -30,10 +30,10 @@ TestHomeDocument = test_home.TestHomeDocument TestMediaType = test_media_type.TestMediaType TestMessagesFaultyDriver = test_messages.TestMessagesFaultyDriver TestMessagesMongoDB = test_messages.TestMessagesMongoDB -TestMessagesMongoDBSharded = test_messages.TestMessagesMongoDBSharded +TestMessagesMongoDBPooled = test_messages.TestMessagesMongoDBPooled TestMessagesSqlalchemy = test_messages.TestMessagesSqlalchemy -TestQueueFaultyDriver = lc.TestQueueLifecycleFaultyDriver -TestQueueLifecycleMongoDB = lc.TestQueueLifecycleMongoDB -TestQueueLifecycleSqlalchemy = lc.TestQueueLifecycleSqlalchemy -TestShardsMongoDB = test_shards.TestShardsMongoDB -TestShardsSqlalchemy = test_shards.TestShardsSqlalchemy +TestQueueFaultyDriver = l.TestQueueLifecycleFaultyDriver +TestQueueLifecycleMongoDB = l.TestQueueLifecycleMongoDB +TestQueueLifecycleSqlalchemy = l.TestQueueLifecycleSqlalchemy +TestPoolsMongoDB = test_pools.TestPoolsMongoDB +TestPoolsSqlalchemy = test_pools.TestPoolsSqlalchemy diff --git a/marconi/tests/queues/transport/wsgi/v1/test_messages.py b/marconi/tests/queues/transport/wsgi/v1/test_messages.py index b1c28fa87..5f84a2459 100644 --- a/marconi/tests/queues/transport/wsgi/v1/test_messages.py +++ b/marconi/tests/queues/transport/wsgi/v1/test_messages.py @@ -36,11 +36,11 @@ class MessagesBaseTest(base.V1Base): def setUp(self): super(MessagesBaseTest, self).setUp() - if self.conf.sharding: + if self.conf.pooling: for i in range(4): uri = self.conf['drivers:storage:mongodb'].uri doc = {'weight': 100, 'uri': uri} - self.simulate_put(self.url_prefix + '/shards/' + str(i), + self.simulate_put(self.url_prefix + '/pools/' + str(i), body=jsonutils.dumps(doc)) self.assertEqual(self.srmock.status, falcon.HTTP_201) @@ -61,9 +61,9 @@ class MessagesBaseTest(base.V1Base): def tearDown(self): self.simulate_delete(self.queue_path, self.project_id) - if self.conf.sharding: + if self.conf.pooling: for i in range(4): - self.simulate_delete(self.url_prefix + '/shards/' + str(i)) + self.simulate_delete(self.url_prefix + '/pools/' + str(i)) super(MessagesBaseTest, self).tearDown() @@ -473,21 +473,21 @@ class TestMessagesMongoDB(MessagesBaseTest): super(TestMessagesMongoDB, self).tearDown() -class TestMessagesMongoDBSharded(MessagesBaseTest): +class TestMessagesMongoDBPooled(MessagesBaseTest): - config_file = 'wsgi_mongodb_sharded.conf' + config_file = 'wsgi_mongodb_pooled.conf' @testing.requires_mongodb def setUp(self): - super(TestMessagesMongoDBSharded, self).setUp() + super(TestMessagesMongoDBPooled, self).setUp() def tearDown(self): - super(TestMessagesMongoDBSharded, self).tearDown() + super(TestMessagesMongoDBPooled, self).tearDown() - # TODO(cpp-cabrera): remove this skipTest once sharded queue + # TODO(cpp-cabrera): remove this skipTest once pooled queue # listing is implemented def test_list(self): - self.skipTest("Need to implement sharded queue listing.") + self.skipTest("Need to implement pooled queue listing.") class TestMessagesFaultyDriver(base.V1BaseFaulty): diff --git a/marconi/tests/queues/transport/wsgi/v1/test_shards.py b/marconi/tests/queues/transport/wsgi/v1/test_pools.py similarity index 66% rename from marconi/tests/queues/transport/wsgi/v1/test_shards.py rename to marconi/tests/queues/transport/wsgi/v1/test_pools.py index 3aeb559dd..10470bcd4 100644 --- a/marconi/tests/queues/transport/wsgi/v1/test_shards.py +++ b/marconi/tests/queues/transport/wsgi/v1/test_pools.py @@ -25,13 +25,13 @@ from marconi.tests.queues.transport.wsgi import base @contextlib.contextmanager -def shard(test, name, weight, uri, options={}): - """A context manager for constructing a shard for use in testing. +def pool(test, name, weight, uri, options={}): + """A context manager for constructing a pool for use in testing. - Deletes the shard after exiting the context. + Deletes the pool after exiting the context. :param test: Must expose simulate_* methods - :param name: Name for this shard + :param name: Name for this pool :type name: six.text_type :type weight: int :type uri: six.text_type @@ -40,7 +40,7 @@ def shard(test, name, weight, uri, options={}): :rtype: see above """ doc = {'weight': weight, 'uri': uri, 'options': options} - path = test.url_prefix + '/shards/' + name + path = test.url_prefix + '/pools/' + name test.simulate_put(path, body=jsonutils.dumps(doc)) @@ -52,18 +52,18 @@ def shard(test, name, weight, uri, options={}): @contextlib.contextmanager -def shards(test, count, uri): - """A context manager for constructing shards for use in testing. +def pools(test, count, uri): + """A context manager for constructing pools for use in testing. - Deletes the shards after exiting the context. + Deletes the pools after exiting the context. :param test: Must expose simulate_* methods - :param count: Number of shards to create + :param count: Number of pools to create :type count: int :returns: (paths, weights, uris, options) :rtype: ([six.text_type], [int], [six.text_type], [dict]) """ - base = test.url_prefix + '/shards/' + base = test.url_prefix + '/pools/' args = [(base + str(i), i, {str(i): i}) for i in range(count)] @@ -79,28 +79,28 @@ def shards(test, count, uri): @ddt.ddt -class ShardsBaseTest(base.V1Base): +class PoolsBaseTest(base.V1Base): def setUp(self): - super(ShardsBaseTest, self).setUp() + super(PoolsBaseTest, self).setUp() self.doc = {'weight': 100, 'uri': 'sqlite://:memory:'} - self.shard = self.url_prefix + '/shards/' + str(uuid.uuid1()) - self.simulate_put(self.shard, body=jsonutils.dumps(self.doc)) + self.pool = self.url_prefix + '/pools/' + str(uuid.uuid1()) + self.simulate_put(self.pool, body=jsonutils.dumps(self.doc)) self.assertEqual(self.srmock.status, falcon.HTTP_201) def tearDown(self): - super(ShardsBaseTest, self).tearDown() - self.simulate_delete(self.shard) + super(PoolsBaseTest, self).tearDown() + self.simulate_delete(self.pool) self.assertEqual(self.srmock.status, falcon.HTTP_204) - def test_put_shard_works(self): + def test_put_pool_works(self): name = str(uuid.uuid1()) weight, uri = self.doc['weight'], self.doc['uri'] - with shard(self, name, weight, uri): + with pool(self, name, weight, uri): self.assertEqual(self.srmock.status, falcon.HTTP_201) def test_put_raises_if_missing_fields(self): - path = self.url_prefix + '/shards/' + str(uuid.uuid1()) + path = self.url_prefix + '/pools/' + str(uuid.uuid1()) self.simulate_put(path, body=jsonutils.dumps({'weight': 100})) self.assertEqual(self.srmock.status, falcon.HTTP_400) @@ -111,7 +111,7 @@ class ShardsBaseTest(base.V1Base): @ddt.data(-1, 2**32+1, 'big') def test_put_raises_if_invalid_weight(self, weight): - path = self.url_prefix + '/shards/' + str(uuid.uuid1()) + path = self.url_prefix + '/pools/' + str(uuid.uuid1()) doc = {'weight': weight, 'uri': 'a'} self.simulate_put(path, body=jsonutils.dumps(doc)) @@ -119,84 +119,84 @@ class ShardsBaseTest(base.V1Base): @ddt.data(-1, 2**32+1, [], 'localhost:27017') def test_put_raises_if_invalid_uri(self, uri): - path = self.url_prefix + '/shards/' + str(uuid.uuid1()) + path = self.url_prefix + '/pools/' + str(uuid.uuid1()) self.simulate_put(path, body=jsonutils.dumps({'weight': 1, 'uri': uri})) self.assertEqual(self.srmock.status, falcon.HTTP_400) @ddt.data(-1, 'wee', []) def test_put_raises_if_invalid_options(self, options): - path = self.url_prefix + '/shards/' + str(uuid.uuid1()) + path = self.url_prefix + '/pools/' + str(uuid.uuid1()) doc = {'weight': 1, 'uri': 'a', 'options': options} self.simulate_put(path, body=jsonutils.dumps(doc)) self.assertEqual(self.srmock.status, falcon.HTTP_400) def test_put_existing_overwrites(self): - # NOTE(cabrera): setUp creates default shard + # NOTE(cabrera): setUp creates default pool expect = self.doc - self.simulate_put(self.shard, + self.simulate_put(self.pool, body=jsonutils.dumps(expect)) self.assertEqual(self.srmock.status, falcon.HTTP_201) - result = self.simulate_get(self.shard) + result = self.simulate_get(self.pool) self.assertEqual(self.srmock.status, falcon.HTTP_200) doc = jsonutils.loads(result[0]) self.assertEqual(doc['weight'], expect['weight']) self.assertEqual(doc['uri'], expect['uri']) def test_delete_works(self): - self.simulate_delete(self.shard) + self.simulate_delete(self.pool) self.assertEqual(self.srmock.status, falcon.HTTP_204) - self.simulate_get(self.shard) + self.simulate_get(self.pool) self.assertEqual(self.srmock.status, falcon.HTTP_404) def test_get_nonexisting_raises_404(self): - self.simulate_get(self.url_prefix + '/shards/nonexisting') + self.simulate_get(self.url_prefix + '/pools/nonexisting') self.assertEqual(self.srmock.status, falcon.HTTP_404) - def _shard_expect(self, shard, xhref, xweight, xuri): - self.assertIn('href', shard) - self.assertEqual(shard['href'], xhref) - self.assertIn('weight', shard) - self.assertEqual(shard['weight'], xweight) - self.assertIn('uri', shard) - self.assertEqual(shard['uri'], xuri) + def _pool_expect(self, pool, xhref, xweight, xuri): + self.assertIn('href', pool) + self.assertEqual(pool['href'], xhref) + self.assertIn('weight', pool) + self.assertEqual(pool['weight'], xweight) + self.assertIn('uri', pool) + self.assertEqual(pool['uri'], xuri) def test_get_works(self): - result = self.simulate_get(self.shard) + result = self.simulate_get(self.pool) self.assertEqual(self.srmock.status, falcon.HTTP_200) - shard = jsonutils.loads(result[0]) - self._shard_expect(shard, self.shard, self.doc['weight'], - self.doc['uri']) + pool = jsonutils.loads(result[0]) + self._pool_expect(pool, self.pool, self.doc['weight'], + self.doc['uri']) def test_detailed_get_works(self): - result = self.simulate_get(self.shard, + result = self.simulate_get(self.pool, query_string='?detailed=True') self.assertEqual(self.srmock.status, falcon.HTTP_200) - shard = jsonutils.loads(result[0]) - self._shard_expect(shard, self.shard, self.doc['weight'], - self.doc['uri']) - self.assertIn('options', shard) - self.assertEqual(shard['options'], {}) + pool = jsonutils.loads(result[0]) + self._pool_expect(pool, self.pool, self.doc['weight'], + self.doc['uri']) + self.assertIn('options', pool) + self.assertEqual(pool['options'], {}) def test_patch_raises_if_missing_fields(self): - self.simulate_patch(self.shard, + self.simulate_patch(self.pool, body=jsonutils.dumps({'location': 1})) self.assertEqual(self.srmock.status, falcon.HTTP_400) def _patch_test(self, doc): - self.simulate_patch(self.shard, + self.simulate_patch(self.pool, body=jsonutils.dumps(doc)) self.assertEqual(self.srmock.status, falcon.HTTP_200) - result = self.simulate_get(self.shard, + result = self.simulate_get(self.pool, query_string='?detailed=True') self.assertEqual(self.srmock.status, falcon.HTTP_200) - shard = jsonutils.loads(result[0]) - self._shard_expect(shard, self.shard, doc['weight'], - doc['uri']) - self.assertEqual(shard['options'], doc['options']) + pool = jsonutils.loads(result[0]) + self._pool_expect(pool, self.pool, doc['weight'], + doc['uri']) + self.assertEqual(pool['options'], doc['options']) def test_patch_works(self): doc = {'weight': 101, 'uri': 'sqlite://:memory:', 'options': {'a': 1}} @@ -209,60 +209,60 @@ class ShardsBaseTest(base.V1Base): @ddt.data(-1, 2**32+1, 'big') def test_patch_raises_400_on_invalid_weight(self, weight): - self.simulate_patch(self.shard, + self.simulate_patch(self.pool, body=jsonutils.dumps({'weight': weight})) self.assertEqual(self.srmock.status, falcon.HTTP_400) @ddt.data(-1, 2**32+1, [], 'localhost:27017') def test_patch_raises_400_on_invalid_uri(self, uri): - self.simulate_patch(self.shard, + self.simulate_patch(self.pool, body=jsonutils.dumps({'uri': uri})) self.assertEqual(self.srmock.status, falcon.HTTP_400) @ddt.data(-1, 'wee', []) def test_patch_raises_400_on_invalid_options(self, options): - self.simulate_patch(self.shard, + self.simulate_patch(self.pool, body=jsonutils.dumps({'options': options})) self.assertEqual(self.srmock.status, falcon.HTTP_400) - def test_patch_raises_404_if_shard_not_found(self): - self.simulate_patch(self.url_prefix + '/shards/notexists', + def test_patch_raises_404_if_pool_not_found(self): + self.simulate_patch(self.url_prefix + '/pools/notexists', body=jsonutils.dumps({'weight': 1})) self.assertEqual(self.srmock.status, falcon.HTTP_404) def test_empty_listing_returns_204(self): - self.simulate_delete(self.shard) - self.simulate_get(self.url_prefix + '/shards') + self.simulate_delete(self.pool) + self.simulate_get(self.url_prefix + '/pools') self.assertEqual(self.srmock.status, falcon.HTTP_204) def _listing_test(self, count=10, limit=10, marker=None, detailed=False): - # NOTE(cpp-cabrera): delete initial shard - it will interfere + # NOTE(cpp-cabrera): delete initial pool - it will interfere # with listing tests - self.simulate_delete(self.shard) + self.simulate_delete(self.pool) query = '?limit={0}&detailed={1}'.format(limit, detailed) if marker: query += '&marker={2}'.format(marker) - with shards(self, count, self.doc['uri']) as expected: - result = self.simulate_get(self.url_prefix + '/shards', + with pools(self, count, self.doc['uri']) as expected: + result = self.simulate_get(self.url_prefix + '/pools', query_string=query) self.assertEqual(self.srmock.status, falcon.HTTP_200) results = jsonutils.loads(result[0]) self.assertIsInstance(results, dict) - self.assertIn('shards', results) - shard_list = results['shards'] - self.assertEqual(len(shard_list), min(limit, count)) - for s in shard_list: + self.assertIn('pools', results) + pool_list = results['pools'] + self.assertEqual(len(pool_list), min(limit, count)) + for s in pool_list: # NOTE(flwang): It can't assumed that both sqlalchemy and # mongodb can return query result with the same order. Just # like the order they're inserted. Actually, sqlalchemy can't # guarantee that. So we're leveraging the relationship between - # shard weight and the index of shards fixture to get the - # right shard to verify. + # pool weight and the index of pools fixture to get the + # right pool to verify. expect = expected[s['weight']] path, weight = expect[:2] - self._shard_expect(s, path, weight, self.doc['uri']) + self._pool_expect(s, path, weight, self.doc['uri']) if detailed: self.assertIn('options', s) self.assertEqual(s['options'], expect[-1]) @@ -280,30 +280,30 @@ class ShardsBaseTest(base.V1Base): self._listing_test(count=15, limit=limit) def test_listing_marker_is_respected(self): - self.simulate_delete(self.shard) + self.simulate_delete(self.pool) - with shards(self, 10, self.doc['uri']) as expected: - result = self.simulate_get(self.url_prefix + '/shards', + with pools(self, 10, self.doc['uri']) as expected: + result = self.simulate_get(self.url_prefix + '/pools', query_string='?marker=3') self.assertEqual(self.srmock.status, falcon.HTTP_200) - shard_list = jsonutils.loads(result[0])['shards'] - self.assertEqual(len(shard_list), 6) + pool_list = jsonutils.loads(result[0])['pools'] + self.assertEqual(len(pool_list), 6) path, weight = expected[4][:2] - self._shard_expect(shard_list[0], path, weight, self.doc['uri']) + self._pool_expect(pool_list[0], path, weight, self.doc['uri']) -class TestShardsMongoDB(ShardsBaseTest): +class TestPoolsMongoDB(PoolsBaseTest): config_file = 'wsgi_mongodb.conf' @testing.requires_mongodb def setUp(self): - super(TestShardsMongoDB, self).setUp() + super(TestPoolsMongoDB, self).setUp() -class TestShardsSqlalchemy(ShardsBaseTest): +class TestPoolsSqlalchemy(PoolsBaseTest): config_file = 'wsgi_sqlalchemy.conf' def setUp(self): - super(TestShardsSqlalchemy, self).setUp() + super(TestPoolsSqlalchemy, self).setUp() diff --git a/marconi/tests/queues/transport/wsgi/v1_1/__init__.py b/marconi/tests/queues/transport/wsgi/v1_1/__init__.py index 31b611291..9c2a7e81a 100644 --- a/marconi/tests/queues/transport/wsgi/v1_1/__init__.py +++ b/marconi/tests/queues/transport/wsgi/v1_1/__init__.py @@ -18,8 +18,8 @@ from marconi.tests.queues.transport.wsgi.v1_1 import test_default_limits from marconi.tests.queues.transport.wsgi.v1_1 import test_home from marconi.tests.queues.transport.wsgi.v1_1 import test_media_type from marconi.tests.queues.transport.wsgi.v1_1 import test_messages -from marconi.tests.queues.transport.wsgi.v1_1 import test_queue_lifecycle as lc -from marconi.tests.queues.transport.wsgi.v1_1 import test_shards +from marconi.tests.queues.transport.wsgi.v1_1 import test_pools +from marconi.tests.queues.transport.wsgi.v1_1 import test_queue_lifecycle as l TestAuth = test_auth.TestAuth TestClaimsFaultyDriver = test_claims.TestClaimsFaultyDriver @@ -30,10 +30,10 @@ TestHomeDocument = test_home.TestHomeDocument TestMediaType = test_media_type.TestMediaType TestMessagesFaultyDriver = test_messages.TestMessagesFaultyDriver TestMessagesMongoDB = test_messages.TestMessagesMongoDB -TestMessagesMongoDBSharded = test_messages.TestMessagesMongoDBSharded +TestMessagesMongoDBPooled = test_messages.TestMessagesMongoDBPooled TestMessagesSqlalchemy = test_messages.TestMessagesSqlalchemy -TestQueueFaultyDriver = lc.TestQueueLifecycleFaultyDriver -TestQueueLifecycleMongoDB = lc.TestQueueLifecycleMongoDB -TestQueueLifecycleSqlalchemy = lc.TestQueueLifecycleSqlalchemy -TestShardsMongoDB = test_shards.TestShardsMongoDB -TestShardsSqlalchemy = test_shards.TestShardsSqlalchemy +TestQueueFaultyDriver = l.TestQueueLifecycleFaultyDriver +TestQueueLifecycleMongoDB = l.TestQueueLifecycleMongoDB +TestQueueLifecycleSqlalchemy = l.TestQueueLifecycleSqlalchemy +TestPoolsMongoDB = test_pools.TestPoolsMongoDB +TestPoolsSqlalchemy = test_pools.TestPoolsSqlalchemy diff --git a/marconi/tests/queues/transport/wsgi/v1_1/test_messages.py b/marconi/tests/queues/transport/wsgi/v1_1/test_messages.py index 810243702..3519e9eac 100644 --- a/marconi/tests/queues/transport/wsgi/v1_1/test_messages.py +++ b/marconi/tests/queues/transport/wsgi/v1_1/test_messages.py @@ -34,11 +34,11 @@ class MessagesBaseTest(base.V1_1Base): def setUp(self): super(MessagesBaseTest, self).setUp() - if self.conf.sharding: + if self.conf.pooling: for i in range(4): uri = self.conf['drivers:storage:mongodb'].uri doc = {'weight': 100, 'uri': uri} - self.simulate_put(self.url_prefix + '/shards/' + str(i), + self.simulate_put(self.url_prefix + '/pools/' + str(i), body=jsonutils.dumps(doc)) self.assertEqual(self.srmock.status, falcon.HTTP_201) @@ -59,9 +59,9 @@ class MessagesBaseTest(base.V1_1Base): def tearDown(self): self.simulate_delete(self.queue_path, headers=self.headers) - if self.conf.sharding: + if self.conf.pooling: for i in range(4): - self.simulate_delete(self.url_prefix + '/shards/' + str(i), + self.simulate_delete(self.url_prefix + '/pools/' + str(i), headers=self.headers) super(MessagesBaseTest, self).tearDown() @@ -480,20 +480,20 @@ class TestMessagesMongoDB(MessagesBaseTest): super(TestMessagesMongoDB, self).tearDown() -class TestMessagesMongoDBSharded(MessagesBaseTest): - config_file = 'wsgi_mongodb_sharded.conf' +class TestMessagesMongoDBPooled(MessagesBaseTest): + config_file = 'wsgi_mongodb_pooled.conf' @testing.requires_mongodb def setUp(self): - super(TestMessagesMongoDBSharded, self).setUp() + super(TestMessagesMongoDBPooled, self).setUp() def tearDown(self): - super(TestMessagesMongoDBSharded, self).tearDown() + super(TestMessagesMongoDBPooled, self).tearDown() - # TODO(cpp-cabrera): remove this skipTest once sharded queue + # TODO(cpp-cabrera): remove this skipTest once pooled queue # listing is implemented def test_list(self): - self.skipTest("Need to implement sharded queue listing.") + self.skipTest("Need to implement pooled queue listing.") class TestMessagesFaultyDriver(base.V1_1BaseFaulty): diff --git a/marconi/tests/queues/transport/wsgi/v1_1/test_shards.py b/marconi/tests/queues/transport/wsgi/v1_1/test_pools.py similarity index 66% rename from marconi/tests/queues/transport/wsgi/v1_1/test_shards.py rename to marconi/tests/queues/transport/wsgi/v1_1/test_pools.py index 27e170723..83bb81e53 100644 --- a/marconi/tests/queues/transport/wsgi/v1_1/test_shards.py +++ b/marconi/tests/queues/transport/wsgi/v1_1/test_pools.py @@ -24,13 +24,13 @@ from marconi.tests.queues.transport.wsgi import base @contextlib.contextmanager -def shard(test, name, weight, uri, options={}): - """A context manager for constructing a shard for use in testing. +def pool(test, name, weight, uri, options={}): + """A context manager for constructing a pool for use in testing. - Deletes the shard after exiting the context. + Deletes the pool after exiting the context. :param test: Must expose simulate_* methods - :param name: Name for this shard + :param name: Name for this pool :type name: six.text_type :type weight: int :type uri: six.text_type @@ -39,7 +39,7 @@ def shard(test, name, weight, uri, options={}): :rtype: see above """ doc = {'weight': weight, 'uri': uri, 'options': options} - path = test.url_prefix + '/shards/' + name + path = test.url_prefix + '/pools/' + name test.simulate_put(path, body=jsonutils.dumps(doc)) @@ -51,18 +51,18 @@ def shard(test, name, weight, uri, options={}): @contextlib.contextmanager -def shards(test, count, uri): - """A context manager for constructing shards for use in testing. +def pools(test, count, uri): + """A context manager for constructing pools for use in testing. - Deletes the shards after exiting the context. + Deletes the pools after exiting the context. :param test: Must expose simulate_* methods - :param count: Number of shards to create + :param count: Number of pools to create :type count: int :returns: (paths, weights, uris, options) :rtype: ([six.text_type], [int], [six.text_type], [dict]) """ - base = test.url_prefix + '/shards/' + base = test.url_prefix + '/pools/' args = [(base + str(i), i, {str(i): i}) for i in range(count)] @@ -78,28 +78,28 @@ def shards(test, count, uri): @ddt.ddt -class ShardsBaseTest(base.V1_1Base): +class PoolsBaseTest(base.V1_1Base): def setUp(self): - super(ShardsBaseTest, self).setUp() + super(PoolsBaseTest, self).setUp() self.doc = {'weight': 100, 'uri': 'sqlite://:memory:'} - self.shard = self.url_prefix + '/shards/' + str(uuid.uuid1()) - self.simulate_put(self.shard, body=jsonutils.dumps(self.doc)) + self.pool = self.url_prefix + '/pools/' + str(uuid.uuid1()) + self.simulate_put(self.pool, body=jsonutils.dumps(self.doc)) self.assertEqual(self.srmock.status, falcon.HTTP_201) def tearDown(self): - super(ShardsBaseTest, self).tearDown() - self.simulate_delete(self.shard) + super(PoolsBaseTest, self).tearDown() + self.simulate_delete(self.pool) self.assertEqual(self.srmock.status, falcon.HTTP_204) - def test_put_shard_works(self): + def test_put_pool_works(self): name = str(uuid.uuid1()) weight, uri = self.doc['weight'], self.doc['uri'] - with shard(self, name, weight, uri): + with pool(self, name, weight, uri): self.assertEqual(self.srmock.status, falcon.HTTP_201) def test_put_raises_if_missing_fields(self): - path = self.url_prefix + '/shards/' + str(uuid.uuid1()) + path = self.url_prefix + '/pools/' + str(uuid.uuid1()) self.simulate_put(path, body=jsonutils.dumps({'weight': 100})) self.assertEqual(self.srmock.status, falcon.HTTP_400) @@ -110,7 +110,7 @@ class ShardsBaseTest(base.V1_1Base): @ddt.data(-1, 2**32+1, 'big') def test_put_raises_if_invalid_weight(self, weight): - path = self.url_prefix + '/shards/' + str(uuid.uuid1()) + path = self.url_prefix + '/pools/' + str(uuid.uuid1()) doc = {'weight': weight, 'uri': 'a'} self.simulate_put(path, body=jsonutils.dumps(doc)) @@ -118,84 +118,84 @@ class ShardsBaseTest(base.V1_1Base): @ddt.data(-1, 2**32+1, [], 'localhost:27017') def test_put_raises_if_invalid_uri(self, uri): - path = self.url_prefix + '/shards/' + str(uuid.uuid1()) + path = self.url_prefix + '/pools/' + str(uuid.uuid1()) self.simulate_put(path, body=jsonutils.dumps({'weight': 1, 'uri': uri})) self.assertEqual(self.srmock.status, falcon.HTTP_400) @ddt.data(-1, 'wee', []) def test_put_raises_if_invalid_options(self, options): - path = self.url_prefix + '/shards/' + str(uuid.uuid1()) + path = self.url_prefix + '/pools/' + str(uuid.uuid1()) doc = {'weight': 1, 'uri': 'a', 'options': options} self.simulate_put(path, body=jsonutils.dumps(doc)) self.assertEqual(self.srmock.status, falcon.HTTP_400) def test_put_existing_overwrites(self): - # NOTE(cabrera): setUp creates default shard + # NOTE(cabrera): setUp creates default pool expect = self.doc - self.simulate_put(self.shard, + self.simulate_put(self.pool, body=jsonutils.dumps(expect)) self.assertEqual(self.srmock.status, falcon.HTTP_201) - result = self.simulate_get(self.shard) + result = self.simulate_get(self.pool) self.assertEqual(self.srmock.status, falcon.HTTP_200) doc = jsonutils.loads(result[0]) self.assertEqual(doc['weight'], expect['weight']) self.assertEqual(doc['uri'], expect['uri']) def test_delete_works(self): - self.simulate_delete(self.shard) + self.simulate_delete(self.pool) self.assertEqual(self.srmock.status, falcon.HTTP_204) - self.simulate_get(self.shard) + self.simulate_get(self.pool) self.assertEqual(self.srmock.status, falcon.HTTP_404) def test_get_nonexisting_raises_404(self): - self.simulate_get(self.url_prefix + '/shards/nonexisting') + self.simulate_get(self.url_prefix + '/pools/nonexisting') self.assertEqual(self.srmock.status, falcon.HTTP_404) - def _shard_expect(self, shard, xhref, xweight, xuri): - self.assertIn('href', shard) - self.assertEqual(shard['href'], xhref) - self.assertIn('weight', shard) - self.assertEqual(shard['weight'], xweight) - self.assertIn('uri', shard) - self.assertEqual(shard['uri'], xuri) + def _pool_expect(self, pool, xhref, xweight, xuri): + self.assertIn('href', pool) + self.assertEqual(pool['href'], xhref) + self.assertIn('weight', pool) + self.assertEqual(pool['weight'], xweight) + self.assertIn('uri', pool) + self.assertEqual(pool['uri'], xuri) def test_get_works(self): - result = self.simulate_get(self.shard) + result = self.simulate_get(self.pool) self.assertEqual(self.srmock.status, falcon.HTTP_200) - shard = jsonutils.loads(result[0]) - self._shard_expect(shard, self.shard, self.doc['weight'], - self.doc['uri']) + pool = jsonutils.loads(result[0]) + self._pool_expect(pool, self.pool, self.doc['weight'], + self.doc['uri']) def test_detailed_get_works(self): - result = self.simulate_get(self.shard, + result = self.simulate_get(self.pool, query_string='?detailed=True') self.assertEqual(self.srmock.status, falcon.HTTP_200) - shard = jsonutils.loads(result[0]) - self._shard_expect(shard, self.shard, self.doc['weight'], - self.doc['uri']) - self.assertIn('options', shard) - self.assertEqual(shard['options'], {}) + pool = jsonutils.loads(result[0]) + self._pool_expect(pool, self.pool, self.doc['weight'], + self.doc['uri']) + self.assertIn('options', pool) + self.assertEqual(pool['options'], {}) def test_patch_raises_if_missing_fields(self): - self.simulate_patch(self.shard, + self.simulate_patch(self.pool, body=jsonutils.dumps({'location': 1})) self.assertEqual(self.srmock.status, falcon.HTTP_400) def _patch_test(self, doc): - self.simulate_patch(self.shard, + self.simulate_patch(self.pool, body=jsonutils.dumps(doc)) self.assertEqual(self.srmock.status, falcon.HTTP_200) - result = self.simulate_get(self.shard, + result = self.simulate_get(self.pool, query_string='?detailed=True') self.assertEqual(self.srmock.status, falcon.HTTP_200) - shard = jsonutils.loads(result[0]) - self._shard_expect(shard, self.shard, doc['weight'], - doc['uri']) - self.assertEqual(shard['options'], doc['options']) + pool = jsonutils.loads(result[0]) + self._pool_expect(pool, self.pool, doc['weight'], + doc['uri']) + self.assertEqual(pool['options'], doc['options']) def test_patch_works(self): doc = {'weight': 101, 'uri': 'sqlite://:memory:', 'options': {'a': 1}} @@ -208,60 +208,60 @@ class ShardsBaseTest(base.V1_1Base): @ddt.data(-1, 2**32+1, 'big') def test_patch_raises_400_on_invalid_weight(self, weight): - self.simulate_patch(self.shard, + self.simulate_patch(self.pool, body=jsonutils.dumps({'weight': weight})) self.assertEqual(self.srmock.status, falcon.HTTP_400) @ddt.data(-1, 2**32+1, [], 'localhost:27017') def test_patch_raises_400_on_invalid_uri(self, uri): - self.simulate_patch(self.shard, + self.simulate_patch(self.pool, body=jsonutils.dumps({'uri': uri})) self.assertEqual(self.srmock.status, falcon.HTTP_400) @ddt.data(-1, 'wee', []) def test_patch_raises_400_on_invalid_options(self, options): - self.simulate_patch(self.shard, + self.simulate_patch(self.pool, body=jsonutils.dumps({'options': options})) self.assertEqual(self.srmock.status, falcon.HTTP_400) - def test_patch_raises_404_if_shard_not_found(self): - self.simulate_patch(self.url_prefix + '/shards/notexists', + def test_patch_raises_404_if_pool_not_found(self): + self.simulate_patch(self.url_prefix + '/pools/notexists', body=jsonutils.dumps({'weight': 1})) self.assertEqual(self.srmock.status, falcon.HTTP_404) def test_empty_listing_returns_204(self): - self.simulate_delete(self.shard) - self.simulate_get(self.url_prefix + '/shards') + self.simulate_delete(self.pool) + self.simulate_get(self.url_prefix + '/pools') self.assertEqual(self.srmock.status, falcon.HTTP_204) def _listing_test(self, count=10, limit=10, marker=None, detailed=False): - # NOTE(cpp-cabrera): delete initial shard - it will interfere + # NOTE(cpp-cabrera): delete initial pool - it will interfere # with listing tests - self.simulate_delete(self.shard) + self.simulate_delete(self.pool) query = '?limit={0}&detailed={1}'.format(limit, detailed) if marker: query += '&marker={2}'.format(marker) - with shards(self, count, self.doc['uri']) as expected: - result = self.simulate_get(self.url_prefix + '/shards', + with pools(self, count, self.doc['uri']) as expected: + result = self.simulate_get(self.url_prefix + '/pools', query_string=query) self.assertEqual(self.srmock.status, falcon.HTTP_200) results = jsonutils.loads(result[0]) self.assertIsInstance(results, dict) - self.assertIn('shards', results) - shard_list = results['shards'] - self.assertEqual(len(shard_list), min(limit, count)) - for s in shard_list: + self.assertIn('pools', results) + pool_list = results['pools'] + self.assertEqual(len(pool_list), min(limit, count)) + for s in pool_list: # NOTE(flwang): It can't assumed that both sqlalchemy and # mongodb can return query result with the same order. Just # like the order they're inserted. Actually, sqlalchemy can't # guarantee that. So we're leveraging the relationship between - # shard weight and the index of shards fixture to get the - # right shard to verify. + # pool weight and the index of pools fixture to get the + # right pool to verify. expect = expected[s['weight']] path, weight = expect[:2] - self._shard_expect(s, path, weight, self.doc['uri']) + self._pool_expect(s, path, weight, self.doc['uri']) if detailed: self.assertIn('options', s) self.assertEqual(s['options'], expect[-1]) @@ -279,30 +279,30 @@ class ShardsBaseTest(base.V1_1Base): self._listing_test(count=15, limit=limit) def test_listing_marker_is_respected(self): - self.simulate_delete(self.shard) + self.simulate_delete(self.pool) - with shards(self, 10, self.doc['uri']) as expected: - result = self.simulate_get(self.url_prefix + '/shards', + with pools(self, 10, self.doc['uri']) as expected: + result = self.simulate_get(self.url_prefix + '/pools', query_string='?marker=3') self.assertEqual(self.srmock.status, falcon.HTTP_200) - shard_list = jsonutils.loads(result[0])['shards'] - self.assertEqual(len(shard_list), 6) + pool_list = jsonutils.loads(result[0])['pools'] + self.assertEqual(len(pool_list), 6) path, weight = expected[4][:2] - self._shard_expect(shard_list[0], path, weight, self.doc['uri']) + self._pool_expect(pool_list[0], path, weight, self.doc['uri']) -class TestShardsMongoDB(ShardsBaseTest): +class TestPoolsMongoDB(PoolsBaseTest): config_file = 'wsgi_mongodb.conf' @testing.requires_mongodb def setUp(self): - super(TestShardsMongoDB, self).setUp() + super(TestPoolsMongoDB, self).setUp() -class TestShardsSqlalchemy(ShardsBaseTest): +class TestPoolsSqlalchemy(PoolsBaseTest): config_file = 'wsgi_sqlalchemy.conf' def setUp(self): - super(TestShardsSqlalchemy, self).setUp() + super(TestPoolsSqlalchemy, self).setUp() diff --git a/tests/etc/wsgi_mongodb_sharded.conf b/tests/etc/wsgi_mongodb_pooled.conf similarity index 89% rename from tests/etc/wsgi_mongodb_sharded.conf rename to tests/etc/wsgi_mongodb_pooled.conf index f3c172582..a97c34897 100644 --- a/tests/etc/wsgi_mongodb_sharded.conf +++ b/tests/etc/wsgi_mongodb_pooled.conf @@ -1,5 +1,5 @@ [DEFAULT] -sharding = True +pooling = True [drivers] transport = wsgi diff --git a/tests/etc/wsgi_sqlalchemy_sharded.conf b/tests/etc/wsgi_sqlalchemy_pooled.conf similarity index 78% rename from tests/etc/wsgi_sqlalchemy_sharded.conf rename to tests/etc/wsgi_sqlalchemy_pooled.conf index 8f3577ef3..3b83fd11f 100644 --- a/tests/etc/wsgi_sqlalchemy_sharded.conf +++ b/tests/etc/wsgi_sqlalchemy_pooled.conf @@ -1,5 +1,5 @@ [DEFAULT] -sharding = True +pooling = True [drivers] transport = wsgi diff --git a/tests/unit/queues/storage/test_impl_mongodb.py b/tests/unit/queues/storage/test_impl_mongodb.py index 72c70212e..3ed32be86 100644 --- a/tests/unit/queues/storage/test_impl_mongodb.py +++ b/tests/unit/queues/storage/test_impl_mongodb.py @@ -364,16 +364,16 @@ class MongodbClaimTests(base.ClaimControllerTest): @testing.requires_mongodb -class MongodbShardsTests(base.ShardsControllerTest): +class MongodbPoolsTests(base.PoolsControllerTest): driver_class = mongodb.ControlDriver - controller_class = controllers.ShardsController + controller_class = controllers.PoolsController def setUp(self): - super(MongodbShardsTests, self).setUp() + super(MongodbPoolsTests, self).setUp() self.load_conf('wsgi_mongodb.conf') def tearDown(self): - super(MongodbShardsTests, self).tearDown() + super(MongodbPoolsTests, self).tearDown() @testing.requires_mongodb diff --git a/tests/unit/queues/storage/test_impl_sqlalchemy.py b/tests/unit/queues/storage/test_impl_sqlalchemy.py index 0701acfba..56c09a062 100644 --- a/tests/unit/queues/storage/test_impl_sqlalchemy.py +++ b/tests/unit/queues/storage/test_impl_sqlalchemy.py @@ -78,16 +78,16 @@ class SqlalchemyClaimTests(base.ClaimControllerTest): controller_class = controllers.ClaimController -class SqlalchemyShardsTest(base.ShardsControllerTest): +class SqlalchemyPoolsTest(base.PoolsControllerTest): driver_class = sqlalchemy.ControlDriver - controller_class = controllers.ShardsController + controller_class = controllers.PoolsController def setUp(self): - super(SqlalchemyShardsTest, self).setUp() + super(SqlalchemyPoolsTest, self).setUp() self.load_conf('wsgi_sqlalchemy.conf') def tearDown(self): - super(SqlalchemyShardsTest, self).tearDown() + super(SqlalchemyPoolsTest, self).tearDown() class SqlalchemyCatalogueTest(base.CatalogueControllerTest): diff --git a/tests/unit/queues/storage/test_shard_catalog.py b/tests/unit/queues/storage/test_pool_catalog.py similarity index 81% rename from tests/unit/queues/storage/test_shard_catalog.py rename to tests/unit/queues/storage/test_pool_catalog.py index 1821c3b27..c6b76f53a 100644 --- a/tests/unit/queues/storage/test_shard_catalog.py +++ b/tests/unit/queues/storage/test_pool_catalog.py @@ -17,7 +17,7 @@ import uuid from oslo.config import cfg from marconi.openstack.common.cache import cache as oslo_cache -from marconi.queues.storage import sharding +from marconi.queues.storage import pooling from marconi.queues.storage import sqlalchemy from marconi.queues.storage import utils from marconi import tests as testing @@ -25,14 +25,14 @@ from marconi import tests as testing # TODO(cpp-cabrera): it would be wonderful to refactor this unit test # so that it could use multiple control storage backends once those -# have shards/catalogue implementations. +# have pools/catalogue implementations. @testing.requires_mongodb -class ShardCatalogTest(testing.TestBase): +class PoolCatalogTest(testing.TestBase): - config_file = 'wsgi_mongodb_sharded.conf' + config_file = 'wsgi_mongodb_pooled.conf' def setUp(self): - super(ShardCatalogTest, self).setUp() + super(PoolCatalogTest, self).setUp() self.conf.register_opts([cfg.StrOpt('storage')], group='drivers') @@ -41,20 +41,20 @@ class ShardCatalogTest(testing.TestBase): control_mode=True) self.catalogue_ctrl = control.catalogue_controller - self.shards_ctrl = control.shards_controller + self.pools_ctrl = control.pools_controller # NOTE(cpp-cabrera): populate catalogue - self.shard = str(uuid.uuid1()) + self.pool = str(uuid.uuid1()) self.queue = str(uuid.uuid1()) self.project = str(uuid.uuid1()) - self.shards_ctrl.create(self.shard, 100, 'sqlite://:memory:') - self.catalogue_ctrl.insert(self.project, self.queue, self.shard) - self.catalog = sharding.Catalog(self.conf, cache, control) + self.pools_ctrl.create(self.pool, 100, 'sqlite://:memory:') + self.catalogue_ctrl.insert(self.project, self.queue, self.pool) + self.catalog = pooling.Catalog(self.conf, cache, control) def tearDown(self): self.catalogue_ctrl.drop_all() - self.shards_ctrl.drop_all() - super(ShardCatalogTest, self).tearDown() + self.pools_ctrl.drop_all() + super(PoolCatalogTest, self).tearDown() def test_lookup_loads_correct_driver(self): storage = self.catalog.lookup(self.queue, self.project) diff --git a/tests/unit/queues/storage/test_shard_queues.py b/tests/unit/queues/storage/test_pool_queues.py similarity index 84% rename from tests/unit/queues/storage/test_shard_queues.py rename to tests/unit/queues/storage/test_pool_queues.py index 531a794ed..b322e9d94 100644 --- a/tests/unit/queues/storage/test_shard_queues.py +++ b/tests/unit/queues/storage/test_pool_queues.py @@ -19,35 +19,35 @@ from oslo.config import cfg import six from marconi.openstack.common.cache import cache as oslo_cache -from marconi.queues.storage import sharding +from marconi.queues.storage import pooling from marconi.queues.storage import utils from marconi import tests as testing @testing.requires_mongodb -class ShardQueuesTest(testing.TestBase): +class PoolQueuesTest(testing.TestBase): def setUp(self): - super(ShardQueuesTest, self).setUp() - conf = self.load_conf('wsgi_mongodb_sharded.conf') + super(PoolQueuesTest, self).setUp() + conf = self.load_conf('wsgi_mongodb_pooled.conf') conf.register_opts([cfg.StrOpt('storage')], group='drivers') cache = oslo_cache.get_cache() control = utils.load_storage_driver(conf, cache, control_mode=True) - self.shards_ctrl = control.shards_controller - self.driver = sharding.DataDriver(conf, cache, control) + self.pools_ctrl = control.pools_controller + self.driver = pooling.DataDriver(conf, cache, control) self.controller = self.driver.queue_controller - # fake two shards + # fake two pools for _ in six.moves.xrange(2): - self.shards_ctrl.create(str(uuid.uuid1()), 100, - 'sqlite://:memory:') + self.pools_ctrl.create(str(uuid.uuid1()), 100, + 'sqlite://:memory:') def tearDown(self): - self.shards_ctrl.drop_all() - super(ShardQueuesTest, self).tearDown() + self.pools_ctrl.drop_all() + super(PoolQueuesTest, self).tearDown() def test_ping(self): ping = self.driver.is_alive() diff --git a/tests/unit/queues/transport/wsgi/test_v1_0.py b/tests/unit/queues/transport/wsgi/test_v1_0.py index 26fca7fc8..f9b915b40 100644 --- a/tests/unit/queues/transport/wsgi/test_v1_0.py +++ b/tests/unit/queues/transport/wsgi/test_v1_0.py @@ -61,7 +61,7 @@ class TestMessagesMongoDB(v1.TestMessagesMongoDB): url_prefix = URL_PREFIX -class TestMessagesMongoDBSharded(v1.TestMessagesMongoDBSharded): +class TestMessagesMongoDBPooled(v1.TestMessagesMongoDBPooled): url_prefix = URL_PREFIX @@ -81,11 +81,11 @@ class TestQueueLifecycleSqlalchemy(v1.TestQueueLifecycleSqlalchemy): url_prefix = URL_PREFIX -class TestShardsMongoDB(v1.TestShardsMongoDB): +class TestPoolsMongoDB(v1.TestPoolsMongoDB): url_prefix = URL_PREFIX -class TestShardsSqlalchemy(v1.TestShardsSqlalchemy): +class TestPoolsSqlalchemy(v1.TestPoolsSqlalchemy): url_prefix = URL_PREFIX diff --git a/tests/unit/queues/transport/wsgi/test_v1_1.py b/tests/unit/queues/transport/wsgi/test_v1_1.py index 5cb94c358..8a4ade6a1 100644 --- a/tests/unit/queues/transport/wsgi/test_v1_1.py +++ b/tests/unit/queues/transport/wsgi/test_v1_1.py @@ -60,7 +60,7 @@ class TestMessagesMongoDB(v1_1.TestMessagesMongoDB): url_prefix = URL_PREFIX -class TestMessagesMongoDBSharded(v1_1.TestMessagesMongoDBSharded): +class TestMessagesMongoDBPooled(v1_1.TestMessagesMongoDBPooled): url_prefix = URL_PREFIX @@ -82,11 +82,11 @@ class TestQueueLifecycleSqlalchemy(v1_1.TestQueueLifecycleSqlalchemy): url_prefix = URL_PREFIX -class TestShardsMongoDB(v1_1.TestShardsMongoDB): +class TestPoolsMongoDB(v1_1.TestPoolsMongoDB): url_prefix = URL_PREFIX -class TestShardsSqlalchemy(v1_1.TestShardsSqlalchemy): +class TestPoolsSqlalchemy(v1_1.TestPoolsSqlalchemy): url_prefix = URL_PREFIX diff --git a/tests/unit/test_bootstrap.py b/tests/unit/test_bootstrap.py index 4b7f0402d..2b8b51cdf 100644 --- a/tests/unit/test_bootstrap.py +++ b/tests/unit/test_bootstrap.py @@ -16,7 +16,7 @@ from marconi.common import errors from marconi.queues import bootstrap from marconi.queues.storage import pipeline -from marconi.queues.storage import sharding +from marconi.queues.storage import pooling from marconi.queues.storage import sqlalchemy from marconi.queues.transport import wsgi from marconi.tests import base @@ -39,10 +39,10 @@ class TestBootstrap(base.TestBase): self.assertIsInstance(bootstrap.storage._storage, sqlalchemy.DataDriver) - def test_storage_sqlalchemy_sharded(self): - """Makes sure we can load the shard driver.""" - bootstrap = self._bootstrap('wsgi_sqlalchemy_sharded.conf') - self.assertIsInstance(bootstrap.storage._storage, sharding.DataDriver) + def test_storage_sqlalchemy_pooled(self): + """Makes sure we can load the pool driver.""" + bootstrap = self._bootstrap('wsgi_sqlalchemy_pooled.conf') + self.assertIsInstance(bootstrap.storage._storage, pooling.DataDriver) def test_transport_invalid(self): bootstrap = self._bootstrap('drivers_transport_invalid.conf')