From f6c321dd8b3f4f921c3cbf82fc385bb6e71cf9c0 Mon Sep 17 00:00:00 2001 From: Alejandro Cabrera Date: Thu, 10 Oct 2013 09:12:53 -0400 Subject: [PATCH] feat: integrate shard storage with transport This patch refines the interface and storage implementation defined in the last patch and integrates it with the transport layer. A few updates have been made: - 'name' -> 'href' for listing shards - limiting, markers, and detailed are all used - use of common_utils.fields to clean up shards transport PATCH - add missing init for schemas - fix schema issues found: 'location' -> 'uri', __init__.py - shard resource correctly implements PUT semantics (replaces) Transport: the admin API concept has been expanded to include functionality from the public interface *in addition* to admin functionality. Part of the rationale behind this is to simplify unit testing. The other part of this is that an admin should be able to do everything a normal user can do in addition to their special functions. Storage: now divided into control and data plane. The bootstrap passes a control driver down to the transport which *can* be used for endpoints as needed. A test suite has been added that exercises the functionality from the transport side of the shard registry resource. Finally, the way the FaultyStorage driver tests were handled was changed. Something about the setattr magic in that suite's setup made it such that *all* tests would use the Faulty storage driver. This is possibly related to the use of lazy_property decorators. To address this issue, this patch promotes the faulty storage driver to setup.cfg visibility and removes the setattrs. Change-Id: I5b8cdb3a11d29422762b52f1e15e33167eecb867 Partitally-implements: blueprint storage-sharding Partially-Closes: 1241686 Closes-Bug: 1243898 --- marconi/common/schemas/__init__.py | 0 marconi/common/schemas/shards.py | 8 +- marconi/queues/bootstrap.py | 11 +- marconi/queues/storage/base.py | 14 +- marconi/queues/storage/mongodb/shards.py | 27 +- marconi/queues/storage/sharding.py | 6 +- marconi/queues/storage/sqlite/driver.py | 1 - marconi/queues/storage/sqlite/shards.py | 2 +- marconi/queues/storage/utils.py | 6 +- marconi/queues/transport/base.py | 7 +- marconi/queues/transport/wsgi/admin/driver.py | 15 +- marconi/queues/transport/wsgi/driver.py | 16 +- .../queues/transport/wsgi/public/driver.py | 3 - marconi/queues/transport/wsgi/shards.py | 84 +++--- marconi/tests/functional/base.py | 11 + marconi/tests/queues/storage/base.py | 25 +- setup.cfg | 2 + tests/etc/wsgi_faulty.conf | 2 +- tests/unit/queues/transport/wsgi/base.py | 15 +- .../unit/queues/transport/wsgi/test_shards.py | 262 ++++++++++++++++++ 20 files changed, 403 insertions(+), 114 deletions(-) create mode 100644 marconi/common/schemas/__init__.py diff --git a/marconi/common/schemas/__init__.py b/marconi/common/schemas/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/common/schemas/shards.py b/marconi/common/schemas/shards.py index 470a39816..3b4d9383d 100644 --- a/marconi/common/schemas/shards.py +++ b/marconi/common/schemas/shards.py @@ -29,9 +29,9 @@ patch_options = { # NOTE(cpp-cabrera): a string valid for use in a URI # TODO(cpp-cabrera): perhaps validate this further using jsonschema's # uri validator as per rfc3987 -patch_location = { +patch_uri = { 'type': 'object', 'properties': { - 'location': { + 'uri': { 'type': 'string' }, 'additionalProperties': False @@ -50,11 +50,11 @@ patch_weight = { create = { 'type': 'object', 'properties': { 'weight': patch_weight['properties']['weight'], - 'location': patch_location['properties']['location'], + 'uri': patch_uri['properties']['uri'], 'options': patch_options['properties']['options'] }, # NOTE(cpp-cabrera): options need not be present. Storage drivers # must provide reasonable defaults. - 'required': ['location', 'weight'], + 'required': ['uri', 'weight'], 'additionalProperties': False } diff --git a/marconi/queues/bootstrap.py b/marconi/queues/bootstrap.py index 779cc4a00..9f170fab9 100644 --- a/marconi/queues/bootstrap.py +++ b/marconi/queues/bootstrap.py @@ -74,6 +74,12 @@ class Bootstrap(object): LOG.debug(_(u'Loading storage pipeline')) return pipeline.DataDriver(self.conf, storage_driver) + @decorators.lazy_property(write=False) + def control(self): + LOG.debug(_(u'Loading storage control driver')) + return storage_utils.load_storage_driver(self.conf, + control_mode=True) + @decorators.lazy_property(write=False) def cache(self): LOG.debug(_(u'Loading Proxy Cache Driver')) @@ -89,13 +95,12 @@ class Bootstrap(object): transport_name = self.driver_conf.transport LOG.debug(_(u'Loading transport driver: %s'), transport_name) + args = [self.conf, self.storage, self.cache, self.control] try: mgr = driver.DriverManager(self._transport_type, transport_name, invoke_on_load=True, - invoke_args=[self.conf, - self.storage, - self.cache]) + invoke_args=args) return mgr.driver except RuntimeError as exc: LOG.exception(exc) diff --git a/marconi/queues/storage/base.py b/marconi/queues/storage/base.py index 9ff19a9d9..e7b6a4360 100644 --- a/marconi/queues/storage/base.py +++ b/marconi/queues/storage/base.py @@ -81,7 +81,6 @@ class ControlDriverBase(object): Allows access to the shard registry through a catalogue and a shard controller. - """ def __init__(self, conf): @@ -373,19 +372,8 @@ class ClaimBase(ControllerBase): raise NotImplementedError -class AdminControllerBase(object): - """Top-level class for controllers. - - :param driver: Instance of the driver - instantiating this controller. - """ - - def __init__(self, driver): - self.driver = driver - - @six.add_metaclass(abc.ABCMeta) -class ShardsBase(AdminControllerBase): +class ShardsBase(ControllerBase): """A controller for managing shards.""" @abc.abstractmethod diff --git a/marconi/queues/storage/mongodb/shards.py b/marconi/queues/storage/mongodb/shards.py index bccec9107..0dd5c6e4f 100644 --- a/marconi/queues/storage/mongodb/shards.py +++ b/marconi/queues/storage/mongodb/shards.py @@ -24,6 +24,8 @@ Schema: 'o': options :: dict """ +import functools + from marconi.common import utils as common_utils from marconi.queues.storage import base, errors from marconi.queues.storage.mongodb import utils @@ -34,11 +36,11 @@ SHARDS_INDEX = [ # NOTE(cpp-cabrera): used for get/list operations. There's no need to # show the marker or the _id - they're implementation details. -OMIT_FIELDS = (('_id', 0),) +OMIT_FIELDS = (('_id', False),) def _field_spec(detailed=False): - return dict(OMIT_FIELDS + (() if detailed else (('o', 0),))) + return dict(OMIT_FIELDS + (() if detailed else (('o', False),))) class ShardsController(base.ShardsBase): @@ -58,8 +60,10 @@ class ShardsController(base.ShardsBase): if marker is not None: query['n'] = {'$gt': marker} - return self._col.find(query, fields=_field_spec(detailed), - limit=limit) + cursor = self._col.find(query, fields=_field_spec(detailed), + limit=limit) + normalizer = functools.partial(_normalize, detailed=detailed) + return utils.HookedCursor(cursor, normalizer) @utils.raises_conn_error def get(self, name, detailed=False): @@ -67,7 +71,8 @@ class ShardsController(base.ShardsBase): _field_spec(detailed)) if not res: raise errors.ShardDoesNotExist(name) - return res + + return _normalize(res, detailed) @utils.raises_conn_error def create(self, name, weight, uri, options=None): @@ -102,3 +107,15 @@ class ShardsController(base.ShardsBase): def drop_all(self): self._col.drop() self._col.ensure_index(SHARDS_INDEX, unique=True) + + +def _normalize(shard, detailed=False): + ret = { + 'name': shard['n'], + 'uri': shard['u'], + 'weight': shard['w'], + } + if detailed: + ret['options'] = shard['o'] + + return ret diff --git a/marconi/queues/storage/sharding.py b/marconi/queues/storage/sharding.py index 67caeb71b..9f71314d5 100644 --- a/marconi/queues/storage/sharding.py +++ b/marconi/queues/storage/sharding.py @@ -73,7 +73,7 @@ class RoutingController(storage.base.ControllerBase): @decorators.cached_getattr def __getattr__(self, name): # NOTE(kgriffs): Use a closure trick to avoid - # some attr lookups each time foward() is called. + # some attr lookups each time forward() is called. lookup = self._shard_catalog.lookup # NOTE(kgriffs): Assume that every controller method @@ -144,14 +144,10 @@ class Catalog(object): # TODO(kgriffs): SHARDING - Read options from catalog backend conf = cfg.ConfigOpts() - general_opts = [ - cfg.BoolOpt('admin_mode', default=False) - ] options = [ cfg.StrOpt('storage', default='sqlite'), ] - conf.register_opts(general_opts) conf.register_opts(options, group='queues:drivers') return utils.load_storage_driver(conf) diff --git a/marconi/queues/storage/sqlite/driver.py b/marconi/queues/storage/sqlite/driver.py index f0882183e..1bcc28ad8 100644 --- a/marconi/queues/storage/sqlite/driver.py +++ b/marconi/queues/storage/sqlite/driver.py @@ -212,7 +212,6 @@ class ControlDriver(storage.ControlDriverBase): self.__conn = sqlite3.connect(self.__path, detect_types=sqlite3.PARSE_DECLTYPES) self.__db = self.__conn.cursor() - self.run('''PRAGMA foreign_keys = ON''') @property def catalogue_controller(self): diff --git a/marconi/queues/storage/sqlite/shards.py b/marconi/queues/storage/sqlite/shards.py index 9aecd3bc6..8e59194bc 100644 --- a/marconi/queues/storage/sqlite/shards.py +++ b/marconi/queues/storage/sqlite/shards.py @@ -22,7 +22,7 @@ class ShardsController(base.ShardsBase): def list(self, marker=None, limit=10, detailed=False): pass - def get(self, name): + def get(self, name, detailed=False): pass def create(self, name, weight, uri, options=None): diff --git a/marconi/queues/storage/utils.py b/marconi/queues/storage/utils.py index c51b7755a..985c9cf51 100644 --- a/marconi/queues/storage/utils.py +++ b/marconi/queues/storage/utils.py @@ -22,7 +22,7 @@ from marconi.openstack.common import log LOG = log.getLogger(__name__) -def load_storage_driver(conf): +def load_storage_driver(conf, control_mode=False): """Loads a storage driver and returns it. The driver's initializer will be passed conf as its only arg. @@ -31,8 +31,10 @@ def load_storage_driver(conf): driver. Must include a 'queues:drivers' group. """ + mode = 'control' if control_mode else 'data' + driver_type = 'marconi.queues.{0}.storage'.format(mode) try: - mgr = driver.DriverManager('marconi.queues.data.storage', + mgr = driver.DriverManager(driver_type, conf['queues:drivers'].storage, invoke_on_load=True, invoke_args=[conf]) diff --git a/marconi/queues/transport/base.py b/marconi/queues/transport/base.py index 139c00d75..80b2814d9 100644 --- a/marconi/queues/transport/base.py +++ b/marconi/queues/transport/base.py @@ -30,15 +30,18 @@ class DriverBase(object): :param conf: configuration instance :type conf: oslo.config.cfg.CONF :param storage: The storage driver - :type storage: marconi.queues.storage.base.DriverBase + :type storage: marconi.queues.storage.base.DataDriverBase :param cache: caching object :type cache: marconi.common.cache.backends.BaseCache + :param control: Storage driver to handle the control plane + :type control: marconi.queues.storage.base.ControlDriverBase """ - def __init__(self, conf, storage, cache): + def __init__(self, conf, storage, cache, control): self._conf = conf self._storage = storage self._cache = cache + self._control = control self._conf.register_opts(_TRANSPORT_OPTIONS) diff --git a/marconi/queues/transport/wsgi/admin/driver.py b/marconi/queues/transport/wsgi/admin/driver.py index 5f6231a3f..775196945 100644 --- a/marconi/queues/transport/wsgi/admin/driver.py +++ b/marconi/queues/transport/wsgi/admin/driver.py @@ -15,19 +15,20 @@ """marconi-queues (admin): interface for managing partitions.""" from marconi.common.transport.wsgi import health -from marconi.queues.transport.wsgi import driver from marconi.queues.transport.wsgi.public import driver as public_driver +from marconi.queues.transport.wsgi import shards -class Driver(driver.DriverBase): - - def __init__(self, conf, storage, cache): - super(Driver, self).__init__(conf, storage, cache) - self.public = public_driver.Driver(conf, storage, cache) +class Driver(public_driver.Driver): @property def bridge(self): - return self.public.bridge + [ + shards_controller = self._control.shards_controller + return super(Driver, self).bridge + [ + ('/shards', + shards.Listing(shards_controller)), + ('/shards/{shard}', + shards.Resource(shards_controller)), ('/health', health.Resource()) ] diff --git a/marconi/queues/transport/wsgi/driver.py b/marconi/queues/transport/wsgi/driver.py index a30bb14ca..b1973c8d7 100644 --- a/marconi/queues/transport/wsgi/driver.py +++ b/marconi/queues/transport/wsgi/driver.py @@ -21,6 +21,7 @@ import falcon from oslo.config import cfg import six +from marconi.common import decorators from marconi.common.transport import version from marconi.common.transport.wsgi import helpers import marconi.openstack.common.log as logging @@ -46,8 +47,8 @@ LOG = logging.getLogger(__name__) @six.add_metaclass(abc.ABCMeta) class DriverBase(transport.DriverBase): - def __init__(self, conf, storage, cache): - super(DriverBase, self).__init__(conf, storage, cache) + def __init__(self, conf, storage, cache, control): + super(DriverBase, self).__init__(conf, storage, cache, control) self._conf.register_opts(_WSGI_OPTIONS, group=_WSGI_GROUP) self._wsgi_conf = self._conf[_WSGI_GROUP] @@ -57,9 +58,10 @@ class DriverBase(transport.DriverBase): self._init_routes() self._init_middleware() - def _init_routes(self): - """Initialize hooks and URI routes to resources.""" - before_hooks = [ + @decorators.lazy_property(write=False) + def before_hooks(self): + """Exposed to facilitate unit testing.""" + return [ helpers.require_accepts_json, helpers.extract_project_id, @@ -68,7 +70,9 @@ class DriverBase(transport.DriverBase): self._validate.queue_name) ] - self.app = falcon.API(before=before_hooks) + def _init_routes(self): + """Initialize hooks and URI routes to resources.""" + self.app = falcon.API(before=self.before_hooks) version_path = version.path() for route, resource in self.bridge: self.app.add_route(version_path + route, resource) diff --git a/marconi/queues/transport/wsgi/public/driver.py b/marconi/queues/transport/wsgi/public/driver.py index d6099f56b..91cc1169f 100644 --- a/marconi/queues/transport/wsgi/public/driver.py +++ b/marconi/queues/transport/wsgi/public/driver.py @@ -25,9 +25,6 @@ from marconi.queues.transport.wsgi import ( class Driver(driver.DriverBase): - def __init__(self, conf, storage, cache): - super(Driver, self).__init__(conf, storage, cache) - @property def bridge(self): queue_controller = self._storage.queue_controller diff --git a/marconi/queues/transport/wsgi/shards.py b/marconi/queues/transport/wsgi/shards.py index 9d6855c0a..a700ab6cc 100644 --- a/marconi/queues/transport/wsgi/shards.py +++ b/marconi/queues/transport/wsgi/shards.py @@ -22,7 +22,7 @@ following fields are required: { "name": string, "weight": integer, - "location": string::uri + "uri": string::uri } Furthermore, depending on the underlying storage type of shard being @@ -38,8 +38,9 @@ import jsonschema from marconi.common.schemas import shards as schema from marconi.common.transport.wsgi import utils +from marconi.common import utils as common_utils from marconi.openstack.common import log -from marconi.proxy.storage import errors +from marconi.queues.storage import errors from marconi.queues.transport import utils as transport_utils from marconi.queues.transport.wsgi import errors as wsgi_errors @@ -47,93 +48,104 @@ LOG = log.getLogger(__name__) class Listing(object): - """A resource to list registered partition + """A resource to list registered shards - :param partitions_controller: means to interact with storage + :param shards_controller: means to interact with storage """ - def __init__(self, partitions_controller): - self._ctrl = partitions_controller + def __init__(self, shards_controller): + self._ctrl = shards_controller - def on_get(self, request, response): - """Returns a partition listing as a JSON object: + def on_get(self, request, response, project_id): + """Returns a shard listing as objects embedded in an array: [ - {"name": "", "weight": 100, "location": ""}, + {"href": "", "weight": 100, "uri": ""}, ... ] :returns: HTTP | [200, 204] """ LOG.debug(u'LIST shards') - resp = list(self._ctrl.list()) - if not resp: + store = {} + request.get_param('marker', store=store) + request.get_param_as_int('limit', store=store) + request.get_param_as_bool('detailed', store=store) + + results = {} + results['shards'] = list(self._ctrl.list(**store)) + for entry in results['shards']: + entry['href'] = request.path + '/' + entry.pop('name') + + if not results['shards']: response.status = falcon.HTTP_204 return - response.body = transport_utils.to_json(resp) + response.content_location = request.relative_uri + response.body = transport_utils.to_json(results) response.status = falcon.HTTP_200 class Resource(object): - """A handler for individual partitions + """A handler for individual shard. - :param partitions_controller: means to interact with storage + :param shards_controller: means to interact with storage """ def __init__(self, shards_controller): self._ctrl = shards_controller validator_type = jsonschema.Draft4Validator self._validators = { 'weight': validator_type(schema.patch_weight), - 'location': validator_type(schema.patch_location), + 'uri': validator_type(schema.patch_uri), 'options': validator_type(schema.patch_options), 'create': validator_type(schema.create) } - def on_get(self, request, response, shard): + def on_get(self, request, response, project_id, shard): """Returns a JSON object for a single shard entry: - {"weight": 100, "location": "", options: {...}} + {"weight": 100, "uri": "", options: {...}} :returns: HTTP | [200, 404] """ LOG.debug(u'GET shard - name: %s', shard) data = None + detailed = request.get_param_as_bool('detailed') or False + try: - data = self._ctrl.get(shard) + data = self._ctrl.get(shard, detailed) + except errors.ShardDoesNotExist as ex: - LOG.exception(ex) + LOG.debug(ex) raise falcon.HTTPNotFound() + data['href'] = request.path + # remove the name entry - it isn't needed on GET del data['name'] response.body = transport_utils.to_json(data) - response.content_location = request.path + response.content_location = request.relative_uri - def on_put(self, request, response, shard): + def on_put(self, request, response, project_id, shard): """Registers a new shard. Expects the following input: - {"weight": 100, "location": ""} + {"weight": 100, "uri": ""} An options object may also be provided. :returns: HTTP | [201, 204] """ LOG.debug(u'PUT shard - name: %s', shard) - if self._ctrl.exists(shard): - LOG.debug(u'Shard %s already exists', shard) - response.status = falcon.HTTP_204 - return data = utils.load(request) utils.validate(self._validators['create'], data) self._ctrl.create(shard, weight=data['weight'], - location=data['location'], + uri=data['uri'], options=data.get('options', {})) response.status = falcon.HTTP_201 response.location = request.path - def on_delete(self, request, response, shard): + def on_delete(self, request, response, project_id, shard): """Deregisters a shard. :returns: HTTP | 204 @@ -142,11 +154,11 @@ class Resource(object): self._ctrl.delete(shard) response.status = falcon.HTTP_204 - def on_patch(self, request, response, shard): - """Allows one to update a shard's weight, location, and/or options. + def on_patch(self, request, response, project_id, shard): + """Allows one to update a shard's weight, uri, and/or options. This method expects the user to submit a JSON object - containing atleast one of: 'hosts', 'weight', 'options'. If + containing atleast one of: 'uri', 'weight', 'options'. If none are found, the request is flagged as bad. There is also strict format checking through the use of jsonschema. Appropriate errors are returned in each case for @@ -157,21 +169,21 @@ class Resource(object): LOG.debug(u'PATCH shard - name: %s', shard) data = utils.load(request) - EXPECT = ('weight', 'location', 'options') + EXPECT = ('weight', 'uri', 'options') if not any([(field in data) for field in EXPECT]): LOG.debug(u'PATCH shard, bad params') raise wsgi_errors.HTTPBadRequestBody( - 'One of `location`, `weight`, or `options` needs ' + 'One of `uri`, `weight`, or `options` needs ' 'to be specified' ) for field in EXPECT: utils.validate(self._validators[field], data) - try: - fields = dict((k, v) for k, v in data.items() - if k in EXPECT and v is not None) + fields = common_utils.fields(data, EXPECT, + pred=lambda v: v is not None) + try: self._ctrl.update(shard, **fields) except errors.ShardDoesNotExist as ex: LOG.exception(ex) diff --git a/marconi/tests/functional/base.py b/marconi/tests/functional/base.py index 0d80c96f5..6df738b04 100644 --- a/marconi/tests/functional/base.py +++ b/marconi/tests/functional/base.py @@ -226,3 +226,14 @@ class MarconiServer(Server): def get_target(self, conf): server = bootstrap.Bootstrap(conf) return server.run + + +class MarconiAdminServer(Server): + + name = "marconi-admin-wsgiref-test-server" + + def get_target(self, conf): + conf.admin_mode = True + server = bootstrap.Bootstrap(conf) + conf.admin_mode = False + return server.run diff --git a/marconi/tests/queues/storage/base.py b/marconi/tests/queues/storage/base.py index 01e9cae4d..edeab68aa 100644 --- a/marconi/tests/queues/storage/base.py +++ b/marconi/tests/queues/storage/base.py @@ -639,22 +639,22 @@ class ShardsControllerTest(ControllerBaseTest): xlocation='localhost2') def _shard_expects(self, shard, xname, xweight, xlocation): - self.assertIn('n', shard) - self.assertEqual(shard['n'], xname) - self.assertIn('w', shard) - self.assertEqual(shard['w'], xweight) - self.assertIn('u', shard) - self.assertEqual(shard['u'], xlocation) + self.assertIn('name', shard) + self.assertEqual(shard['name'], xname) + self.assertIn('weight', shard) + self.assertEqual(shard['weight'], xweight) + self.assertIn('uri', shard) + self.assertEqual(shard['uri'], xlocation) def test_get_returns_expected_content(self): res = self.shards_controller.get(self.shard) self._shard_expects(res, self.shard, 100, 'localhost') - self.assertNotIn('o', res) + self.assertNotIn('options', res) def test_detailed_get_returns_expected_content(self): res = self.shards_controller.get(self.shard, detailed=True) - self.assertIn('o', res) - self.assertEqual(res['o'], {}) + self.assertIn('options', res) + self.assertEqual(res['options'], {}) def test_get_raises_if_not_found(self): self.assertRaises(storage.errors.ShardDoesNotExist, @@ -674,7 +674,7 @@ class ShardsControllerTest(ControllerBaseTest): options={'a': 1}) res = self.shards_controller.get(self.shard, detailed=True) self._shard_expects(res, self.shard, 101, 'redis://localhost') - self.assertEqual(res['o'], {'a': 1}) + self.assertEqual(res['options'], {'a': 1}) def test_delete_works(self): self.shards_controller.delete(self.shard) @@ -699,6 +699,7 @@ class ShardsControllerTest(ControllerBaseTest): self.assertEqual(len(res), 10) for i, entry in enumerate(res): self._shard_expects(entry, str(i), i, str(i)) + self.assertNotIn('options', entry) res = list(self.shards_controller.list(limit=5)) self.assertEqual(len(res), 5) @@ -710,8 +711,8 @@ class ShardsControllerTest(ControllerBaseTest): self.assertEqual(len(res), 10) for i, entry in enumerate(res): self._shard_expects(entry, str(i), i, str(i)) - self.assertIn('o', entry) - self.assertEqual(entry['o'], {}) + self.assertIn('options', entry) + self.assertEqual(entry['options'], {}) class CatalogueControllerTest(ControllerBaseTest): diff --git a/setup.cfg b/setup.cfg index 72f73b2a5..cee55111b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,10 +29,12 @@ console_scripts = marconi.queues.data.storage = sqlite = marconi.queues.storage.sqlite.driver:DataDriver mongodb = marconi.queues.storage.mongodb.driver:DataDriver + faulty = marconi.tests.faulty_storage:DataDriver marconi.queues.control.storage = sqlite = marconi.queues.storage.sqlite.driver:ControlDriver mongodb = marconi.queues.storage.mongodb.driver:ControlDriver + faulty = marconi.tests.faulty_storage:ControlDriver marconi.queues.public.transport = wsgi = marconi.queues.transport.wsgi.public.driver:Driver diff --git a/tests/etc/wsgi_faulty.conf b/tests/etc/wsgi_faulty.conf index 6ee0d82df..90cd941ec 100644 --- a/tests/etc/wsgi_faulty.conf +++ b/tests/etc/wsgi_faulty.conf @@ -4,7 +4,7 @@ verbose = False [queues:drivers] transport = wsgi -storage = sqlite +storage = faulty [queues:drivers:transport:wsgi] port = 8888 diff --git a/tests/unit/queues/transport/wsgi/base.py b/tests/unit/queues/transport/wsgi/base.py index 88f8a2422..118a1c04b 100644 --- a/tests/unit/queues/transport/wsgi/base.py +++ b/tests/unit/queues/transport/wsgi/base.py @@ -15,12 +15,10 @@ # limitations under the License. from falcon import testing as ftest -from oslo.config import cfg from marconi.queues import bootstrap from marconi.queues.transport.wsgi import driver from marconi import tests as testing -from marconi.tests import faulty_storage class TestBase(testing.TestBase): @@ -38,9 +36,10 @@ class TestBase(testing.TestBase): group=driver._WSGI_GROUP) self.wsgi_cfg = conf[driver._WSGI_GROUP] + conf.admin_mode = True self.boot = bootstrap.Bootstrap(conf) - self.app = self.boot.transport.app + self.srmock = ftest.StartResponseMock() def simulate_request(self, path, project_id=None, **kwargs): @@ -97,13 +96,3 @@ class TestBase(testing.TestBase): class TestBaseFaulty(TestBase): """This test ensures we aren't letting any exceptions go unhandled.""" - - def setUp(self): - self._storage_backup = bootstrap.Bootstrap.storage - faulty = faulty_storage.DataDriver(cfg.ConfigOpts()) - setattr(bootstrap.Bootstrap, 'storage', faulty) - super(TestBaseFaulty, self).setUp() - - def tearDown(self): - setattr(bootstrap.Bootstrap, 'storage', self._storage_backup) - super(TestBaseFaulty, self).tearDown() diff --git a/tests/unit/queues/transport/wsgi/test_shards.py b/tests/unit/queues/transport/wsgi/test_shards.py index 15edbd519..7cbc2aabe 100644 --- a/tests/unit/queues/transport/wsgi/test_shards.py +++ b/tests/unit/queues/transport/wsgi/test_shards.py @@ -14,9 +14,69 @@ # See the License for the specific language governing permissions and # limitations under the License. +import contextlib +import json +import uuid + import ddt +import falcon import base # noqa +from marconi import tests as testing + + +@contextlib.contextmanager +def shard(test, name, weight, uri, options={}): + """A context manager for constructing a shard for use in testing. + + Deletes the shard after exiting the context. + + :param test: Must expose simulate_* methods + :param name: Name for this shard + :type name: six.text_type + :type weight: int + :type uri: six.text_type + :type options: dict + :returns: (name, weight, uri, options) + :rtype: see above + """ + doc = {'weight': weight, 'uri': uri, 'options': options} + path = '/v1/shards/' + name + + test.simulate_put(path, body=json.dumps(doc)) + + try: + yield name, weight, uri, options + + finally: + test.simulate_delete(path) + + +@contextlib.contextmanager +def shards(test, count): + """A context manager for constructing shards for use in testing. + + Deletes the shards after exiting the context. + + :param test: Must expose simulate_* methods + :param count: Number of shards to create + :type count: int + :returns: (paths, weights, uris, options) + :rtype: ([six.text_type], [int], [six.text_type], [dict]) + """ + base = '/v1/shards/' + args = [(base + str(i), i, + str(i), {str(i): i}) + for i in range(count)] + for path, weight, uri, option in args: + doc = {'weight': weight, 'uri': uri, 'options': option} + test.simulate_put(path, body=json.dumps(doc)) + + try: + yield args + finally: + for path, _, _, _ in args: + test.simulate_delete(path) @ddt.ddt @@ -24,6 +84,208 @@ class ShardsBaseTest(base.TestBase): def setUp(self): super(ShardsBaseTest, self).setUp() + self.doc = {'weight': 100, 'uri': 'localhost'} + self.shard = '/v1/shards/' + str(uuid.uuid1()) + self.simulate_put(self.shard, body=json.dumps(self.doc)) + self.assertEqual(self.srmock.status, falcon.HTTP_201) def tearDown(self): super(ShardsBaseTest, self).tearDown() + self.simulate_delete(self.shard) + self.assertEqual(self.srmock.status, falcon.HTTP_204) + + def test_put_shard_works(self): + name = str(uuid.uuid1()) + with shard(self, name, 100, 'localhost'): + self.assertEqual(self.srmock.status, falcon.HTTP_201) + + def test_put_raises_if_missing_fields(self): + path = '/v1/shards/' + str(uuid.uuid1()) + self.simulate_put(path, body=json.dumps({'weight': 100})) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + self.simulate_put(path, body=json.dumps({'uri': 'localhost'})) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + @ddt.data(-1, 2**32+1, 'big') + def test_put_raises_if_invalid_weight(self, weight): + path = '/v1/shards/' + str(uuid.uuid1()) + doc = {'weight': weight, 'uri': 'a'} + self.simulate_put(path, + body=json.dumps(doc)) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + @ddt.data(-1, 2**32+1, []) + def test_put_raises_if_invalid_uri(self, uri): + path = '/v1/shards/' + str(uuid.uuid1()) + self.simulate_put(path, + body=json.dumps({'weight': 1, 'uri': uri})) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + @ddt.data(-1, 'wee', []) + def test_put_raises_if_invalid_options(self, options): + path = '/v1/shards/' + str(uuid.uuid1()) + doc = {'weight': 1, 'uri': 'a', 'options': options} + self.simulate_put(path, body=json.dumps(doc)) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + def test_put_existing_overwrites(self): + doc = {'weight': 20, 'uri': 'awesome'} + self.simulate_put(self.shard, + body=json.dumps(doc)) + self.assertEqual(self.srmock.status, falcon.HTTP_201) + + result = self.simulate_get(self.shard) + self.assertEqual(self.srmock.status, falcon.HTTP_200) + doc = json.loads(result[0]) + self.assertEqual(doc['weight'], 20) + self.assertEqual(doc['uri'], 'awesome') + + def test_delete_works(self): + self.simulate_delete(self.shard) + self.assertEqual(self.srmock.status, falcon.HTTP_204) + + self.simulate_get(self.shard) + self.assertEqual(self.srmock.status, falcon.HTTP_404) + + def test_get_nonexisting_raises_404(self): + self.simulate_get('/v1/shards/nonexisting') + self.assertEqual(self.srmock.status, falcon.HTTP_404) + + def _shard_expect(self, shard, xhref, xweight, xuri): + self.assertIn('href', shard) + self.assertEqual(shard['href'], xhref) + self.assertIn('weight', shard) + self.assertEqual(shard['weight'], xweight) + self.assertIn('uri', shard) + self.assertEqual(shard['uri'], xuri) + + def test_get_works(self): + result = self.simulate_get(self.shard) + self.assertEqual(self.srmock.status, falcon.HTTP_200) + shard = json.loads(result[0]) + self._shard_expect(shard, self.shard, self.doc['weight'], + self.doc['uri']) + + def test_detailed_get_works(self): + result = self.simulate_get(self.shard, + query_string='?detailed=True') + self.assertEqual(self.srmock.status, falcon.HTTP_200) + shard = json.loads(result[0]) + self._shard_expect(shard, self.shard, self.doc['weight'], + self.doc['uri']) + self.assertIn('options', shard) + self.assertEqual(shard['options'], {}) + + def test_patch_raises_if_missing_fields(self): + self.simulate_patch(self.shard, + body=json.dumps({'location': 1})) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + def _patch_test(self, doc): + self.simulate_patch(self.shard, + body=json.dumps(doc)) + self.assertEqual(self.srmock.status, falcon.HTTP_200) + + result = self.simulate_get(self.shard, + query_string='?detailed=True') + self.assertEqual(self.srmock.status, falcon.HTTP_200) + shard = json.loads(result[0]) + self._shard_expect(shard, self.shard, doc['weight'], + doc['uri']) + self.assertEqual(shard['options'], doc['options']) + + def test_patch_works(self): + doc = {'weight': 101, 'uri': 'remotehost', 'options': {'a': 1}} + self._patch_test(doc) + + def test_patch_works_with_extra_fields(self): + doc = {'weight': 101, 'uri': 'remotehost', 'options': {'a': 1}, + 'location': 100, 'partition': 'taco'} + self._patch_test(doc) + + @ddt.data(-1, 2**32+1, 'big') + def test_patch_raises_400_on_invalid_weight(self, weight): + self.simulate_patch(self.shard, + body=json.dumps({'weight': weight})) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + @ddt.data(-1, 2**32+1, []) + def test_patch_raises_400_on_invalid_uri(self, uri): + self.simulate_patch(self.shard, + body=json.dumps({'uri': uri})) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + @ddt.data(-1, 'wee', []) + def test_patch_raises_400_on_invalid_options(self, options): + self.simulate_patch(self.shard, + body=json.dumps({'options': options})) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + def test_patch_raises_404_if_shard_not_found(self): + self.simulate_patch('/v1/shards/notexists', + body=json.dumps({'weight': 1})) + self.assertEqual(self.srmock.status, falcon.HTTP_404) + + def test_empty_listing_returns_204(self): + self.simulate_delete(self.shard) + self.simulate_get('/v1/shards') + self.assertEqual(self.srmock.status, falcon.HTTP_204) + + def _listing_test(self, count=10, limit=10, + marker=None, detailed=False): + # NOTE(cpp-cabrera): delete initial shard - it will interfere + # with listing tests + self.simulate_delete(self.shard) + query = '?limit={0}&detailed={1}'.format(limit, detailed) + if marker: + query += '&marker={2}'.format(marker) + + with shards(self, count) as expected: + result = self.simulate_get('/v1/shards', + query_string=query) + self.assertEqual(self.srmock.status, falcon.HTTP_200) + results = json.loads(result[0]) + self.assertIsInstance(results, dict) + self.assertIn('shards', results) + shard_list = results['shards'] + self.assertEqual(len(shard_list), min(limit, count)) + for (i, s), expect in zip(enumerate(shard_list), expected): + path, weight, uri = expect[:3] + self._shard_expect(s, path, weight, uri) + if detailed: + self.assertIn('options', s) + self.assertEqual(s['options'], expect[-1]) + else: + self.assertNotIn('options', s) + + def test_listing_works(self): + self._listing_test() + + def test_detailed_listing_works(self): + self._listing_test(detailed=True) + + @ddt.data(1, 5, 10, 15) + def test_listing_works_with_limit(self, limit): + self._listing_test(count=15, limit=limit) + + def test_listing_marker_is_respected(self): + self.simulate_delete(self.shard) + + with shards(self, 10) as expected: + result = self.simulate_get('/v1/shards', + query_string='?marker=3') + self.assertEqual(self.srmock.status, falcon.HTTP_200) + shard_list = json.loads(result[0])['shards'] + self.assertEqual(len(shard_list), 6) + path, weight, uri = expected[4][:3] + self._shard_expect(shard_list[0], path, weight, uri) + + +@testing.requires_mongodb +class ShardsMongoDBTests(ShardsBaseTest): + + config_filename = 'wsgi_mongodb.conf' + + def setUp(self): + super(ShardsMongoDBTests, self).setUp()