diff --git a/marconi/common/schemas/__init__.py b/marconi/common/schemas/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/common/schemas/shards.py b/marconi/common/schemas/shards.py index 470a39816..3b4d9383d 100644 --- a/marconi/common/schemas/shards.py +++ b/marconi/common/schemas/shards.py @@ -29,9 +29,9 @@ patch_options = { # NOTE(cpp-cabrera): a string valid for use in a URI # TODO(cpp-cabrera): perhaps validate this further using jsonschema's # uri validator as per rfc3987 -patch_location = { +patch_uri = { 'type': 'object', 'properties': { - 'location': { + 'uri': { 'type': 'string' }, 'additionalProperties': False @@ -50,11 +50,11 @@ patch_weight = { create = { 'type': 'object', 'properties': { 'weight': patch_weight['properties']['weight'], - 'location': patch_location['properties']['location'], + 'uri': patch_uri['properties']['uri'], 'options': patch_options['properties']['options'] }, # NOTE(cpp-cabrera): options need not be present. Storage drivers # must provide reasonable defaults. - 'required': ['location', 'weight'], + 'required': ['uri', 'weight'], 'additionalProperties': False } diff --git a/marconi/queues/bootstrap.py b/marconi/queues/bootstrap.py index 779cc4a00..9f170fab9 100644 --- a/marconi/queues/bootstrap.py +++ b/marconi/queues/bootstrap.py @@ -74,6 +74,12 @@ class Bootstrap(object): LOG.debug(_(u'Loading storage pipeline')) return pipeline.DataDriver(self.conf, storage_driver) + @decorators.lazy_property(write=False) + def control(self): + LOG.debug(_(u'Loading storage control driver')) + return storage_utils.load_storage_driver(self.conf, + control_mode=True) + @decorators.lazy_property(write=False) def cache(self): LOG.debug(_(u'Loading Proxy Cache Driver')) @@ -89,13 +95,12 @@ class Bootstrap(object): transport_name = self.driver_conf.transport LOG.debug(_(u'Loading transport driver: %s'), transport_name) + args = [self.conf, self.storage, self.cache, self.control] try: mgr = driver.DriverManager(self._transport_type, transport_name, invoke_on_load=True, - invoke_args=[self.conf, - self.storage, - self.cache]) + invoke_args=args) return mgr.driver except RuntimeError as exc: LOG.exception(exc) diff --git a/marconi/queues/storage/base.py b/marconi/queues/storage/base.py index 9ff19a9d9..e7b6a4360 100644 --- a/marconi/queues/storage/base.py +++ b/marconi/queues/storage/base.py @@ -81,7 +81,6 @@ class ControlDriverBase(object): Allows access to the shard registry through a catalogue and a shard controller. - """ def __init__(self, conf): @@ -373,19 +372,8 @@ class ClaimBase(ControllerBase): raise NotImplementedError -class AdminControllerBase(object): - """Top-level class for controllers. - - :param driver: Instance of the driver - instantiating this controller. - """ - - def __init__(self, driver): - self.driver = driver - - @six.add_metaclass(abc.ABCMeta) -class ShardsBase(AdminControllerBase): +class ShardsBase(ControllerBase): """A controller for managing shards.""" @abc.abstractmethod diff --git a/marconi/queues/storage/mongodb/shards.py b/marconi/queues/storage/mongodb/shards.py index bccec9107..0dd5c6e4f 100644 --- a/marconi/queues/storage/mongodb/shards.py +++ b/marconi/queues/storage/mongodb/shards.py @@ -24,6 +24,8 @@ Schema: 'o': options :: dict """ +import functools + from marconi.common import utils as common_utils from marconi.queues.storage import base, errors from marconi.queues.storage.mongodb import utils @@ -34,11 +36,11 @@ SHARDS_INDEX = [ # NOTE(cpp-cabrera): used for get/list operations. There's no need to # show the marker or the _id - they're implementation details. -OMIT_FIELDS = (('_id', 0),) +OMIT_FIELDS = (('_id', False),) def _field_spec(detailed=False): - return dict(OMIT_FIELDS + (() if detailed else (('o', 0),))) + return dict(OMIT_FIELDS + (() if detailed else (('o', False),))) class ShardsController(base.ShardsBase): @@ -58,8 +60,10 @@ class ShardsController(base.ShardsBase): if marker is not None: query['n'] = {'$gt': marker} - return self._col.find(query, fields=_field_spec(detailed), - limit=limit) + cursor = self._col.find(query, fields=_field_spec(detailed), + limit=limit) + normalizer = functools.partial(_normalize, detailed=detailed) + return utils.HookedCursor(cursor, normalizer) @utils.raises_conn_error def get(self, name, detailed=False): @@ -67,7 +71,8 @@ class ShardsController(base.ShardsBase): _field_spec(detailed)) if not res: raise errors.ShardDoesNotExist(name) - return res + + return _normalize(res, detailed) @utils.raises_conn_error def create(self, name, weight, uri, options=None): @@ -102,3 +107,15 @@ class ShardsController(base.ShardsBase): def drop_all(self): self._col.drop() self._col.ensure_index(SHARDS_INDEX, unique=True) + + +def _normalize(shard, detailed=False): + ret = { + 'name': shard['n'], + 'uri': shard['u'], + 'weight': shard['w'], + } + if detailed: + ret['options'] = shard['o'] + + return ret diff --git a/marconi/queues/storage/sharding.py b/marconi/queues/storage/sharding.py index 67caeb71b..9f71314d5 100644 --- a/marconi/queues/storage/sharding.py +++ b/marconi/queues/storage/sharding.py @@ -73,7 +73,7 @@ class RoutingController(storage.base.ControllerBase): @decorators.cached_getattr def __getattr__(self, name): # NOTE(kgriffs): Use a closure trick to avoid - # some attr lookups each time foward() is called. + # some attr lookups each time forward() is called. lookup = self._shard_catalog.lookup # NOTE(kgriffs): Assume that every controller method @@ -144,14 +144,10 @@ class Catalog(object): # TODO(kgriffs): SHARDING - Read options from catalog backend conf = cfg.ConfigOpts() - general_opts = [ - cfg.BoolOpt('admin_mode', default=False) - ] options = [ cfg.StrOpt('storage', default='sqlite'), ] - conf.register_opts(general_opts) conf.register_opts(options, group='queues:drivers') return utils.load_storage_driver(conf) diff --git a/marconi/queues/storage/sqlite/driver.py b/marconi/queues/storage/sqlite/driver.py index f0882183e..1bcc28ad8 100644 --- a/marconi/queues/storage/sqlite/driver.py +++ b/marconi/queues/storage/sqlite/driver.py @@ -212,7 +212,6 @@ class ControlDriver(storage.ControlDriverBase): self.__conn = sqlite3.connect(self.__path, detect_types=sqlite3.PARSE_DECLTYPES) self.__db = self.__conn.cursor() - self.run('''PRAGMA foreign_keys = ON''') @property def catalogue_controller(self): diff --git a/marconi/queues/storage/sqlite/shards.py b/marconi/queues/storage/sqlite/shards.py index 9aecd3bc6..8e59194bc 100644 --- a/marconi/queues/storage/sqlite/shards.py +++ b/marconi/queues/storage/sqlite/shards.py @@ -22,7 +22,7 @@ class ShardsController(base.ShardsBase): def list(self, marker=None, limit=10, detailed=False): pass - def get(self, name): + def get(self, name, detailed=False): pass def create(self, name, weight, uri, options=None): diff --git a/marconi/queues/storage/utils.py b/marconi/queues/storage/utils.py index c51b7755a..985c9cf51 100644 --- a/marconi/queues/storage/utils.py +++ b/marconi/queues/storage/utils.py @@ -22,7 +22,7 @@ from marconi.openstack.common import log LOG = log.getLogger(__name__) -def load_storage_driver(conf): +def load_storage_driver(conf, control_mode=False): """Loads a storage driver and returns it. The driver's initializer will be passed conf as its only arg. @@ -31,8 +31,10 @@ def load_storage_driver(conf): driver. Must include a 'queues:drivers' group. """ + mode = 'control' if control_mode else 'data' + driver_type = 'marconi.queues.{0}.storage'.format(mode) try: - mgr = driver.DriverManager('marconi.queues.data.storage', + mgr = driver.DriverManager(driver_type, conf['queues:drivers'].storage, invoke_on_load=True, invoke_args=[conf]) diff --git a/marconi/queues/transport/base.py b/marconi/queues/transport/base.py index 139c00d75..80b2814d9 100644 --- a/marconi/queues/transport/base.py +++ b/marconi/queues/transport/base.py @@ -30,15 +30,18 @@ class DriverBase(object): :param conf: configuration instance :type conf: oslo.config.cfg.CONF :param storage: The storage driver - :type storage: marconi.queues.storage.base.DriverBase + :type storage: marconi.queues.storage.base.DataDriverBase :param cache: caching object :type cache: marconi.common.cache.backends.BaseCache + :param control: Storage driver to handle the control plane + :type control: marconi.queues.storage.base.ControlDriverBase """ - def __init__(self, conf, storage, cache): + def __init__(self, conf, storage, cache, control): self._conf = conf self._storage = storage self._cache = cache + self._control = control self._conf.register_opts(_TRANSPORT_OPTIONS) diff --git a/marconi/queues/transport/wsgi/admin/driver.py b/marconi/queues/transport/wsgi/admin/driver.py index 5f6231a3f..775196945 100644 --- a/marconi/queues/transport/wsgi/admin/driver.py +++ b/marconi/queues/transport/wsgi/admin/driver.py @@ -15,19 +15,20 @@ """marconi-queues (admin): interface for managing partitions.""" from marconi.common.transport.wsgi import health -from marconi.queues.transport.wsgi import driver from marconi.queues.transport.wsgi.public import driver as public_driver +from marconi.queues.transport.wsgi import shards -class Driver(driver.DriverBase): - - def __init__(self, conf, storage, cache): - super(Driver, self).__init__(conf, storage, cache) - self.public = public_driver.Driver(conf, storage, cache) +class Driver(public_driver.Driver): @property def bridge(self): - return self.public.bridge + [ + shards_controller = self._control.shards_controller + return super(Driver, self).bridge + [ + ('/shards', + shards.Listing(shards_controller)), + ('/shards/{shard}', + shards.Resource(shards_controller)), ('/health', health.Resource()) ] diff --git a/marconi/queues/transport/wsgi/driver.py b/marconi/queues/transport/wsgi/driver.py index a30bb14ca..b1973c8d7 100644 --- a/marconi/queues/transport/wsgi/driver.py +++ b/marconi/queues/transport/wsgi/driver.py @@ -21,6 +21,7 @@ import falcon from oslo.config import cfg import six +from marconi.common import decorators from marconi.common.transport import version from marconi.common.transport.wsgi import helpers import marconi.openstack.common.log as logging @@ -46,8 +47,8 @@ LOG = logging.getLogger(__name__) @six.add_metaclass(abc.ABCMeta) class DriverBase(transport.DriverBase): - def __init__(self, conf, storage, cache): - super(DriverBase, self).__init__(conf, storage, cache) + def __init__(self, conf, storage, cache, control): + super(DriverBase, self).__init__(conf, storage, cache, control) self._conf.register_opts(_WSGI_OPTIONS, group=_WSGI_GROUP) self._wsgi_conf = self._conf[_WSGI_GROUP] @@ -57,9 +58,10 @@ class DriverBase(transport.DriverBase): self._init_routes() self._init_middleware() - def _init_routes(self): - """Initialize hooks and URI routes to resources.""" - before_hooks = [ + @decorators.lazy_property(write=False) + def before_hooks(self): + """Exposed to facilitate unit testing.""" + return [ helpers.require_accepts_json, helpers.extract_project_id, @@ -68,7 +70,9 @@ class DriverBase(transport.DriverBase): self._validate.queue_name) ] - self.app = falcon.API(before=before_hooks) + def _init_routes(self): + """Initialize hooks and URI routes to resources.""" + self.app = falcon.API(before=self.before_hooks) version_path = version.path() for route, resource in self.bridge: self.app.add_route(version_path + route, resource) diff --git a/marconi/queues/transport/wsgi/public/driver.py b/marconi/queues/transport/wsgi/public/driver.py index d6099f56b..91cc1169f 100644 --- a/marconi/queues/transport/wsgi/public/driver.py +++ b/marconi/queues/transport/wsgi/public/driver.py @@ -25,9 +25,6 @@ from marconi.queues.transport.wsgi import ( class Driver(driver.DriverBase): - def __init__(self, conf, storage, cache): - super(Driver, self).__init__(conf, storage, cache) - @property def bridge(self): queue_controller = self._storage.queue_controller diff --git a/marconi/queues/transport/wsgi/shards.py b/marconi/queues/transport/wsgi/shards.py index 9d6855c0a..a700ab6cc 100644 --- a/marconi/queues/transport/wsgi/shards.py +++ b/marconi/queues/transport/wsgi/shards.py @@ -22,7 +22,7 @@ following fields are required: { "name": string, "weight": integer, - "location": string::uri + "uri": string::uri } Furthermore, depending on the underlying storage type of shard being @@ -38,8 +38,9 @@ import jsonschema from marconi.common.schemas import shards as schema from marconi.common.transport.wsgi import utils +from marconi.common import utils as common_utils from marconi.openstack.common import log -from marconi.proxy.storage import errors +from marconi.queues.storage import errors from marconi.queues.transport import utils as transport_utils from marconi.queues.transport.wsgi import errors as wsgi_errors @@ -47,93 +48,104 @@ LOG = log.getLogger(__name__) class Listing(object): - """A resource to list registered partition + """A resource to list registered shards - :param partitions_controller: means to interact with storage + :param shards_controller: means to interact with storage """ - def __init__(self, partitions_controller): - self._ctrl = partitions_controller + def __init__(self, shards_controller): + self._ctrl = shards_controller - def on_get(self, request, response): - """Returns a partition listing as a JSON object: + def on_get(self, request, response, project_id): + """Returns a shard listing as objects embedded in an array: [ - {"name": "", "weight": 100, "location": ""}, + {"href": "", "weight": 100, "uri": ""}, ... ] :returns: HTTP | [200, 204] """ LOG.debug(u'LIST shards') - resp = list(self._ctrl.list()) - if not resp: + store = {} + request.get_param('marker', store=store) + request.get_param_as_int('limit', store=store) + request.get_param_as_bool('detailed', store=store) + + results = {} + results['shards'] = list(self._ctrl.list(**store)) + for entry in results['shards']: + entry['href'] = request.path + '/' + entry.pop('name') + + if not results['shards']: response.status = falcon.HTTP_204 return - response.body = transport_utils.to_json(resp) + response.content_location = request.relative_uri + response.body = transport_utils.to_json(results) response.status = falcon.HTTP_200 class Resource(object): - """A handler for individual partitions + """A handler for individual shard. - :param partitions_controller: means to interact with storage + :param shards_controller: means to interact with storage """ def __init__(self, shards_controller): self._ctrl = shards_controller validator_type = jsonschema.Draft4Validator self._validators = { 'weight': validator_type(schema.patch_weight), - 'location': validator_type(schema.patch_location), + 'uri': validator_type(schema.patch_uri), 'options': validator_type(schema.patch_options), 'create': validator_type(schema.create) } - def on_get(self, request, response, shard): + def on_get(self, request, response, project_id, shard): """Returns a JSON object for a single shard entry: - {"weight": 100, "location": "", options: {...}} + {"weight": 100, "uri": "", options: {...}} :returns: HTTP | [200, 404] """ LOG.debug(u'GET shard - name: %s', shard) data = None + detailed = request.get_param_as_bool('detailed') or False + try: - data = self._ctrl.get(shard) + data = self._ctrl.get(shard, detailed) + except errors.ShardDoesNotExist as ex: - LOG.exception(ex) + LOG.debug(ex) raise falcon.HTTPNotFound() + data['href'] = request.path + # remove the name entry - it isn't needed on GET del data['name'] response.body = transport_utils.to_json(data) - response.content_location = request.path + response.content_location = request.relative_uri - def on_put(self, request, response, shard): + def on_put(self, request, response, project_id, shard): """Registers a new shard. Expects the following input: - {"weight": 100, "location": ""} + {"weight": 100, "uri": ""} An options object may also be provided. :returns: HTTP | [201, 204] """ LOG.debug(u'PUT shard - name: %s', shard) - if self._ctrl.exists(shard): - LOG.debug(u'Shard %s already exists', shard) - response.status = falcon.HTTP_204 - return data = utils.load(request) utils.validate(self._validators['create'], data) self._ctrl.create(shard, weight=data['weight'], - location=data['location'], + uri=data['uri'], options=data.get('options', {})) response.status = falcon.HTTP_201 response.location = request.path - def on_delete(self, request, response, shard): + def on_delete(self, request, response, project_id, shard): """Deregisters a shard. :returns: HTTP | 204 @@ -142,11 +154,11 @@ class Resource(object): self._ctrl.delete(shard) response.status = falcon.HTTP_204 - def on_patch(self, request, response, shard): - """Allows one to update a shard's weight, location, and/or options. + def on_patch(self, request, response, project_id, shard): + """Allows one to update a shard's weight, uri, and/or options. This method expects the user to submit a JSON object - containing atleast one of: 'hosts', 'weight', 'options'. If + containing atleast one of: 'uri', 'weight', 'options'. If none are found, the request is flagged as bad. There is also strict format checking through the use of jsonschema. Appropriate errors are returned in each case for @@ -157,21 +169,21 @@ class Resource(object): LOG.debug(u'PATCH shard - name: %s', shard) data = utils.load(request) - EXPECT = ('weight', 'location', 'options') + EXPECT = ('weight', 'uri', 'options') if not any([(field in data) for field in EXPECT]): LOG.debug(u'PATCH shard, bad params') raise wsgi_errors.HTTPBadRequestBody( - 'One of `location`, `weight`, or `options` needs ' + 'One of `uri`, `weight`, or `options` needs ' 'to be specified' ) for field in EXPECT: utils.validate(self._validators[field], data) - try: - fields = dict((k, v) for k, v in data.items() - if k in EXPECT and v is not None) + fields = common_utils.fields(data, EXPECT, + pred=lambda v: v is not None) + try: self._ctrl.update(shard, **fields) except errors.ShardDoesNotExist as ex: LOG.exception(ex) diff --git a/marconi/tests/functional/base.py b/marconi/tests/functional/base.py index 0d80c96f5..6df738b04 100644 --- a/marconi/tests/functional/base.py +++ b/marconi/tests/functional/base.py @@ -226,3 +226,14 @@ class MarconiServer(Server): def get_target(self, conf): server = bootstrap.Bootstrap(conf) return server.run + + +class MarconiAdminServer(Server): + + name = "marconi-admin-wsgiref-test-server" + + def get_target(self, conf): + conf.admin_mode = True + server = bootstrap.Bootstrap(conf) + conf.admin_mode = False + return server.run diff --git a/marconi/tests/queues/storage/base.py b/marconi/tests/queues/storage/base.py index 01e9cae4d..edeab68aa 100644 --- a/marconi/tests/queues/storage/base.py +++ b/marconi/tests/queues/storage/base.py @@ -639,22 +639,22 @@ class ShardsControllerTest(ControllerBaseTest): xlocation='localhost2') def _shard_expects(self, shard, xname, xweight, xlocation): - self.assertIn('n', shard) - self.assertEqual(shard['n'], xname) - self.assertIn('w', shard) - self.assertEqual(shard['w'], xweight) - self.assertIn('u', shard) - self.assertEqual(shard['u'], xlocation) + self.assertIn('name', shard) + self.assertEqual(shard['name'], xname) + self.assertIn('weight', shard) + self.assertEqual(shard['weight'], xweight) + self.assertIn('uri', shard) + self.assertEqual(shard['uri'], xlocation) def test_get_returns_expected_content(self): res = self.shards_controller.get(self.shard) self._shard_expects(res, self.shard, 100, 'localhost') - self.assertNotIn('o', res) + self.assertNotIn('options', res) def test_detailed_get_returns_expected_content(self): res = self.shards_controller.get(self.shard, detailed=True) - self.assertIn('o', res) - self.assertEqual(res['o'], {}) + self.assertIn('options', res) + self.assertEqual(res['options'], {}) def test_get_raises_if_not_found(self): self.assertRaises(storage.errors.ShardDoesNotExist, @@ -674,7 +674,7 @@ class ShardsControllerTest(ControllerBaseTest): options={'a': 1}) res = self.shards_controller.get(self.shard, detailed=True) self._shard_expects(res, self.shard, 101, 'redis://localhost') - self.assertEqual(res['o'], {'a': 1}) + self.assertEqual(res['options'], {'a': 1}) def test_delete_works(self): self.shards_controller.delete(self.shard) @@ -699,6 +699,7 @@ class ShardsControllerTest(ControllerBaseTest): self.assertEqual(len(res), 10) for i, entry in enumerate(res): self._shard_expects(entry, str(i), i, str(i)) + self.assertNotIn('options', entry) res = list(self.shards_controller.list(limit=5)) self.assertEqual(len(res), 5) @@ -710,8 +711,8 @@ class ShardsControllerTest(ControllerBaseTest): self.assertEqual(len(res), 10) for i, entry in enumerate(res): self._shard_expects(entry, str(i), i, str(i)) - self.assertIn('o', entry) - self.assertEqual(entry['o'], {}) + self.assertIn('options', entry) + self.assertEqual(entry['options'], {}) class CatalogueControllerTest(ControllerBaseTest): diff --git a/setup.cfg b/setup.cfg index 72f73b2a5..cee55111b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,10 +29,12 @@ console_scripts = marconi.queues.data.storage = sqlite = marconi.queues.storage.sqlite.driver:DataDriver mongodb = marconi.queues.storage.mongodb.driver:DataDriver + faulty = marconi.tests.faulty_storage:DataDriver marconi.queues.control.storage = sqlite = marconi.queues.storage.sqlite.driver:ControlDriver mongodb = marconi.queues.storage.mongodb.driver:ControlDriver + faulty = marconi.tests.faulty_storage:ControlDriver marconi.queues.public.transport = wsgi = marconi.queues.transport.wsgi.public.driver:Driver diff --git a/tests/etc/wsgi_faulty.conf b/tests/etc/wsgi_faulty.conf index 6ee0d82df..90cd941ec 100644 --- a/tests/etc/wsgi_faulty.conf +++ b/tests/etc/wsgi_faulty.conf @@ -4,7 +4,7 @@ verbose = False [queues:drivers] transport = wsgi -storage = sqlite +storage = faulty [queues:drivers:transport:wsgi] port = 8888 diff --git a/tests/unit/queues/transport/wsgi/base.py b/tests/unit/queues/transport/wsgi/base.py index 88f8a2422..118a1c04b 100644 --- a/tests/unit/queues/transport/wsgi/base.py +++ b/tests/unit/queues/transport/wsgi/base.py @@ -15,12 +15,10 @@ # limitations under the License. from falcon import testing as ftest -from oslo.config import cfg from marconi.queues import bootstrap from marconi.queues.transport.wsgi import driver from marconi import tests as testing -from marconi.tests import faulty_storage class TestBase(testing.TestBase): @@ -38,9 +36,10 @@ class TestBase(testing.TestBase): group=driver._WSGI_GROUP) self.wsgi_cfg = conf[driver._WSGI_GROUP] + conf.admin_mode = True self.boot = bootstrap.Bootstrap(conf) - self.app = self.boot.transport.app + self.srmock = ftest.StartResponseMock() def simulate_request(self, path, project_id=None, **kwargs): @@ -97,13 +96,3 @@ class TestBase(testing.TestBase): class TestBaseFaulty(TestBase): """This test ensures we aren't letting any exceptions go unhandled.""" - - def setUp(self): - self._storage_backup = bootstrap.Bootstrap.storage - faulty = faulty_storage.DataDriver(cfg.ConfigOpts()) - setattr(bootstrap.Bootstrap, 'storage', faulty) - super(TestBaseFaulty, self).setUp() - - def tearDown(self): - setattr(bootstrap.Bootstrap, 'storage', self._storage_backup) - super(TestBaseFaulty, self).tearDown() diff --git a/tests/unit/queues/transport/wsgi/test_shards.py b/tests/unit/queues/transport/wsgi/test_shards.py index 15edbd519..7cbc2aabe 100644 --- a/tests/unit/queues/transport/wsgi/test_shards.py +++ b/tests/unit/queues/transport/wsgi/test_shards.py @@ -14,9 +14,69 @@ # See the License for the specific language governing permissions and # limitations under the License. +import contextlib +import json +import uuid + import ddt +import falcon import base # noqa +from marconi import tests as testing + + +@contextlib.contextmanager +def shard(test, name, weight, uri, options={}): + """A context manager for constructing a shard for use in testing. + + Deletes the shard after exiting the context. + + :param test: Must expose simulate_* methods + :param name: Name for this shard + :type name: six.text_type + :type weight: int + :type uri: six.text_type + :type options: dict + :returns: (name, weight, uri, options) + :rtype: see above + """ + doc = {'weight': weight, 'uri': uri, 'options': options} + path = '/v1/shards/' + name + + test.simulate_put(path, body=json.dumps(doc)) + + try: + yield name, weight, uri, options + + finally: + test.simulate_delete(path) + + +@contextlib.contextmanager +def shards(test, count): + """A context manager for constructing shards for use in testing. + + Deletes the shards after exiting the context. + + :param test: Must expose simulate_* methods + :param count: Number of shards to create + :type count: int + :returns: (paths, weights, uris, options) + :rtype: ([six.text_type], [int], [six.text_type], [dict]) + """ + base = '/v1/shards/' + args = [(base + str(i), i, + str(i), {str(i): i}) + for i in range(count)] + for path, weight, uri, option in args: + doc = {'weight': weight, 'uri': uri, 'options': option} + test.simulate_put(path, body=json.dumps(doc)) + + try: + yield args + finally: + for path, _, _, _ in args: + test.simulate_delete(path) @ddt.ddt @@ -24,6 +84,208 @@ class ShardsBaseTest(base.TestBase): def setUp(self): super(ShardsBaseTest, self).setUp() + self.doc = {'weight': 100, 'uri': 'localhost'} + self.shard = '/v1/shards/' + str(uuid.uuid1()) + self.simulate_put(self.shard, body=json.dumps(self.doc)) + self.assertEqual(self.srmock.status, falcon.HTTP_201) def tearDown(self): super(ShardsBaseTest, self).tearDown() + self.simulate_delete(self.shard) + self.assertEqual(self.srmock.status, falcon.HTTP_204) + + def test_put_shard_works(self): + name = str(uuid.uuid1()) + with shard(self, name, 100, 'localhost'): + self.assertEqual(self.srmock.status, falcon.HTTP_201) + + def test_put_raises_if_missing_fields(self): + path = '/v1/shards/' + str(uuid.uuid1()) + self.simulate_put(path, body=json.dumps({'weight': 100})) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + self.simulate_put(path, body=json.dumps({'uri': 'localhost'})) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + @ddt.data(-1, 2**32+1, 'big') + def test_put_raises_if_invalid_weight(self, weight): + path = '/v1/shards/' + str(uuid.uuid1()) + doc = {'weight': weight, 'uri': 'a'} + self.simulate_put(path, + body=json.dumps(doc)) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + @ddt.data(-1, 2**32+1, []) + def test_put_raises_if_invalid_uri(self, uri): + path = '/v1/shards/' + str(uuid.uuid1()) + self.simulate_put(path, + body=json.dumps({'weight': 1, 'uri': uri})) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + @ddt.data(-1, 'wee', []) + def test_put_raises_if_invalid_options(self, options): + path = '/v1/shards/' + str(uuid.uuid1()) + doc = {'weight': 1, 'uri': 'a', 'options': options} + self.simulate_put(path, body=json.dumps(doc)) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + def test_put_existing_overwrites(self): + doc = {'weight': 20, 'uri': 'awesome'} + self.simulate_put(self.shard, + body=json.dumps(doc)) + self.assertEqual(self.srmock.status, falcon.HTTP_201) + + result = self.simulate_get(self.shard) + self.assertEqual(self.srmock.status, falcon.HTTP_200) + doc = json.loads(result[0]) + self.assertEqual(doc['weight'], 20) + self.assertEqual(doc['uri'], 'awesome') + + def test_delete_works(self): + self.simulate_delete(self.shard) + self.assertEqual(self.srmock.status, falcon.HTTP_204) + + self.simulate_get(self.shard) + self.assertEqual(self.srmock.status, falcon.HTTP_404) + + def test_get_nonexisting_raises_404(self): + self.simulate_get('/v1/shards/nonexisting') + self.assertEqual(self.srmock.status, falcon.HTTP_404) + + def _shard_expect(self, shard, xhref, xweight, xuri): + self.assertIn('href', shard) + self.assertEqual(shard['href'], xhref) + self.assertIn('weight', shard) + self.assertEqual(shard['weight'], xweight) + self.assertIn('uri', shard) + self.assertEqual(shard['uri'], xuri) + + def test_get_works(self): + result = self.simulate_get(self.shard) + self.assertEqual(self.srmock.status, falcon.HTTP_200) + shard = json.loads(result[0]) + self._shard_expect(shard, self.shard, self.doc['weight'], + self.doc['uri']) + + def test_detailed_get_works(self): + result = self.simulate_get(self.shard, + query_string='?detailed=True') + self.assertEqual(self.srmock.status, falcon.HTTP_200) + shard = json.loads(result[0]) + self._shard_expect(shard, self.shard, self.doc['weight'], + self.doc['uri']) + self.assertIn('options', shard) + self.assertEqual(shard['options'], {}) + + def test_patch_raises_if_missing_fields(self): + self.simulate_patch(self.shard, + body=json.dumps({'location': 1})) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + def _patch_test(self, doc): + self.simulate_patch(self.shard, + body=json.dumps(doc)) + self.assertEqual(self.srmock.status, falcon.HTTP_200) + + result = self.simulate_get(self.shard, + query_string='?detailed=True') + self.assertEqual(self.srmock.status, falcon.HTTP_200) + shard = json.loads(result[0]) + self._shard_expect(shard, self.shard, doc['weight'], + doc['uri']) + self.assertEqual(shard['options'], doc['options']) + + def test_patch_works(self): + doc = {'weight': 101, 'uri': 'remotehost', 'options': {'a': 1}} + self._patch_test(doc) + + def test_patch_works_with_extra_fields(self): + doc = {'weight': 101, 'uri': 'remotehost', 'options': {'a': 1}, + 'location': 100, 'partition': 'taco'} + self._patch_test(doc) + + @ddt.data(-1, 2**32+1, 'big') + def test_patch_raises_400_on_invalid_weight(self, weight): + self.simulate_patch(self.shard, + body=json.dumps({'weight': weight})) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + @ddt.data(-1, 2**32+1, []) + def test_patch_raises_400_on_invalid_uri(self, uri): + self.simulate_patch(self.shard, + body=json.dumps({'uri': uri})) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + @ddt.data(-1, 'wee', []) + def test_patch_raises_400_on_invalid_options(self, options): + self.simulate_patch(self.shard, + body=json.dumps({'options': options})) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + def test_patch_raises_404_if_shard_not_found(self): + self.simulate_patch('/v1/shards/notexists', + body=json.dumps({'weight': 1})) + self.assertEqual(self.srmock.status, falcon.HTTP_404) + + def test_empty_listing_returns_204(self): + self.simulate_delete(self.shard) + self.simulate_get('/v1/shards') + self.assertEqual(self.srmock.status, falcon.HTTP_204) + + def _listing_test(self, count=10, limit=10, + marker=None, detailed=False): + # NOTE(cpp-cabrera): delete initial shard - it will interfere + # with listing tests + self.simulate_delete(self.shard) + query = '?limit={0}&detailed={1}'.format(limit, detailed) + if marker: + query += '&marker={2}'.format(marker) + + with shards(self, count) as expected: + result = self.simulate_get('/v1/shards', + query_string=query) + self.assertEqual(self.srmock.status, falcon.HTTP_200) + results = json.loads(result[0]) + self.assertIsInstance(results, dict) + self.assertIn('shards', results) + shard_list = results['shards'] + self.assertEqual(len(shard_list), min(limit, count)) + for (i, s), expect in zip(enumerate(shard_list), expected): + path, weight, uri = expect[:3] + self._shard_expect(s, path, weight, uri) + if detailed: + self.assertIn('options', s) + self.assertEqual(s['options'], expect[-1]) + else: + self.assertNotIn('options', s) + + def test_listing_works(self): + self._listing_test() + + def test_detailed_listing_works(self): + self._listing_test(detailed=True) + + @ddt.data(1, 5, 10, 15) + def test_listing_works_with_limit(self, limit): + self._listing_test(count=15, limit=limit) + + def test_listing_marker_is_respected(self): + self.simulate_delete(self.shard) + + with shards(self, 10) as expected: + result = self.simulate_get('/v1/shards', + query_string='?marker=3') + self.assertEqual(self.srmock.status, falcon.HTTP_200) + shard_list = json.loads(result[0])['shards'] + self.assertEqual(len(shard_list), 6) + path, weight, uri = expected[4][:3] + self._shard_expect(shard_list[0], path, weight, uri) + + +@testing.requires_mongodb +class ShardsMongoDBTests(ShardsBaseTest): + + config_filename = 'wsgi_mongodb.conf' + + def setUp(self): + super(ShardsMongoDBTests, self).setUp()