feat: split queues API into public/admin

This change is made in prepartion for the upcoming sharded storage
features. Shard registration is a feature that only operators should
be able to do, and since the sharding is done within the queues
application, it was necessary to break this out into a separate API.

This patch adds a new configuration variable: admin_mode. It is used
to multiplex which version of the API is loaded. Furthermore, the
admin API is an enhanced version of the public API in that it allows
every route that the public API in addition to admin-only
endpoints. This should ease unit testing in future patches.

A few small refactorings were made, including:
- health resource moved to common transport location
- version module moved to common/transport
- pass config from bootstrap to transport driver
- pass cache in base transport driver
- convert base transport driver to use six.add_metaclass
- pass public bootstrap to bootstrap unit test

Change-Id: I0d6ff381afb25adb8a4b784a60b6d6eb71431245
Partially-implements: blueprint storage-sharding
This commit is contained in:
Alejandro Cabrera 2013-10-09 10:52:30 -04:00
parent 3e0ffc4f49
commit 13d7974cb9
19 changed files with 174 additions and 99 deletions

View File

@ -17,6 +17,9 @@ log_file = /var/log/marconi/queues.log
# Set to True to enable sharding across multiple storage backends # Set to True to enable sharding across multiple storage backends
;sharding = False ;sharding = False
# Set to True to activate endpoints to manage the shard registry
;admin_mode = False
# ================= Syslog Options ============================ # ================= Syslog Options ============================
# Send logs to syslog (/dev/log) instead of to file specified # Send logs to syslog (/dev/log) instead of to file specified

View File

@ -17,10 +17,10 @@
import falcon import falcon
class HealthResource(object): class Resource(object):
def on_get(self, req, resp, project_id): def on_get(self, req, resp, **kwargs):
resp.status = falcon.HTTP_204 resp.status = falcon.HTTP_204
def on_head(self, req, resp, project_id): def on_head(self, req, resp, **kwargs):
resp.status = falcon.HTTP_204 resp.status = falcon.HTTP_204

View File

@ -15,8 +15,9 @@
"""marconi-proxy (admin): interface for managing partitions.""" """marconi-proxy (admin): interface for managing partitions."""
from marconi.common.transport.wsgi import health
from marconi.proxy.transport.wsgi import ( from marconi.proxy.transport.wsgi import (
catalogue, driver, health, partitions, catalogue, driver, partitions,
) )

View File

@ -22,10 +22,10 @@ import falcon
from oslo.config import cfg from oslo.config import cfg
import six import six
from marconi.common.transport import version
from marconi.common.transport.wsgi import helpers from marconi.common.transport.wsgi import helpers
import marconi.openstack.common.log as logging import marconi.openstack.common.log as logging
from marconi.proxy import transport from marconi.proxy import transport
from marconi.proxy.transport.wsgi import version
from marconi.proxy.utils import round_robin from marconi.proxy.utils import round_robin
from marconi.queues.transport import auth from marconi.queues.transport import auth

View File

@ -17,15 +17,16 @@
Forwards requests to the appropriate marconi queues server. Forwards requests to the appropriate marconi queues server.
""" """
from marconi.common.transport.wsgi import health
from marconi.proxy.transport.wsgi import ( from marconi.proxy.transport.wsgi import (
driver, forward, health, metadata, driver, forward, metadata,
queues, v1 queues, v1
) )
class Driver(driver.DriverBase): class Driver(driver.DriverBase):
def __init(self, storage, cache): def __init__(self, storage, cache):
super(Driver, self).__init__(storage, cache) super(Driver, self).__init__(storage, cache)
@property @property

View File

@ -16,6 +16,7 @@
from oslo.config import cfg from oslo.config import cfg
from stevedore import driver from stevedore import driver
from marconi.common.cache import cache as oslo_cache
from marconi.common import decorators from marconi.common import decorators
from marconi.common import exceptions from marconi.common import exceptions
from marconi.openstack.common import log from marconi.openstack.common import log
@ -29,6 +30,8 @@ LOG = log.getLogger(__name__)
_GENERAL_OPTIONS = [ _GENERAL_OPTIONS = [
cfg.BoolOpt('sharding', default=False, cfg.BoolOpt('sharding', default=False,
help='Enable sharding across multiple storage backends'), help='Enable sharding across multiple storage backends'),
cfg.BoolOpt('admin_mode', default=False,
help='Activate endpoints to manage shard registry.'),
] ]
_DRIVER_OPTIONS = [ _DRIVER_OPTIONS = [
@ -55,6 +58,8 @@ class Bootstrap(object):
self.driver_conf = self.conf[_DRIVER_GROUP] self.driver_conf = self.conf[_DRIVER_GROUP]
log.setup('marconi') log.setup('marconi')
mode = 'admin' if self.conf.admin_mode else 'public'
self._transport_type = 'marconi.queues.{0}.transport'.format(mode)
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)
def storage(self): def storage(self):
@ -69,16 +74,28 @@ class Bootstrap(object):
LOG.debug(_(u'Loading storage pipeline')) LOG.debug(_(u'Loading storage pipeline'))
return pipeline.Driver(self.conf, storage_driver) return pipeline.Driver(self.conf, storage_driver)
@decorators.lazy_property(write=False)
def cache(self):
LOG.debug(_(u'Loading Proxy Cache Driver'))
try:
mgr = oslo_cache.get_cache(self.conf)
return mgr
except RuntimeError as exc:
LOG.exception(exc)
raise exceptions.InvalidDriver(exc)
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)
def transport(self): def transport(self):
transport_name = self.driver_conf.transport transport_name = self.driver_conf.transport
LOG.debug(_(u'Loading transport driver: ') + transport_name) LOG.debug(_(u'Loading transport driver: ') + transport_name)
try: try:
mgr = driver.DriverManager('marconi.queues.transport', mgr = driver.DriverManager(self._transport_type,
transport_name, transport_name,
invoke_on_load=True, invoke_on_load=True,
invoke_args=[self.conf, self.storage]) invoke_args=[self.conf,
self.storage,
self.cache])
return mgr.driver return mgr.driver
except RuntimeError as exc: except RuntimeError as exc:
LOG.exception(exc) LOG.exception(exc)

View File

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
import abc import abc
import six
from oslo.config import cfg from oslo.config import cfg
@ -22,17 +23,22 @@ _TRANSPORT_OPTIONS = [
] ]
@six.add_metaclass(abc.ABCMeta)
class DriverBase(object): class DriverBase(object):
"""Base class for Transport Drivers to document the expected interface. """Base class for Transport Drivers to document the expected interface.
:param conf: configuration instance
:type conf: oslo.config.cfg.CONF
:param storage: The storage driver :param storage: The storage driver
:type storage: marconi.queues.storage.base.DriverBase
:param cache: caching object
:type cache: marconi.common.cache.backends.BaseCache
""" """
__metaclass__ = abc.ABCMeta def __init__(self, conf, storage, cache):
def __init__(self, conf, storage):
self._conf = conf self._conf = conf
self._storage = storage self._storage = storage
self._cache = cache
self._conf.register_opts(_TRANSPORT_OPTIONS) self._conf.register_opts(_TRANSPORT_OPTIONS)

View File

@ -3,4 +3,4 @@
from marconi.queues.transport.wsgi import driver from marconi.queues.transport.wsgi import driver
# Hoist into package namespace # Hoist into package namespace
Driver = driver.Driver Driver = driver.DriverBase

View File

@ -12,12 +12,22 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""marconi-queues (admin): interface for managing partitions."""
"""health: returns the health information for this proxy.""" from marconi.common.transport.wsgi import health
from marconi.queues.transport.wsgi import driver
import falcon from marconi.queues.transport.wsgi.public import driver as public_driver
class Resource(object): class Driver(driver.DriverBase):
def on_get(self, request, response):
response.status = falcon.HTTP_204 def __init__(self, conf, storage, cache):
super(Driver, self).__init__(conf, storage, cache)
self.public = public_driver.Driver(conf, storage, cache)
@property
def bridge(self):
return self.public.bridge + [
('/health',
health.Resource())
]

View File

@ -18,7 +18,7 @@
This app should be used by external WSGI This app should be used by external WSGI
containers. For example: containers. For example:
$ gunicorn marconi.transport.wsgi.app:app $ gunicorn marconi.queues.transport.wsgi.public.app:app
NOTE: As for external containers, it is necessary NOTE: As for external containers, it is necessary
to put config files in the standard paths. There's to put config files in the standard paths. There's

View File

@ -13,24 +13,19 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import abc
import functools import functools
from wsgiref import simple_server from wsgiref import simple_server
import falcon import falcon
from oslo.config import cfg from oslo.config import cfg
import six
from marconi.common.transport import version
from marconi.common.transport.wsgi import helpers from marconi.common.transport.wsgi import helpers
import marconi.openstack.common.log as logging import marconi.openstack.common.log as logging
from marconi.queues import transport from marconi.queues import transport
from marconi.queues.transport import auth from marconi.queues.transport import auth, validation
from marconi.queues.transport import validation
from marconi.queues.transport.wsgi import claims
from marconi.queues.transport.wsgi import health
from marconi.queues.transport.wsgi import messages
from marconi.queues.transport.wsgi import metadata
from marconi.queues.transport.wsgi import queues
from marconi.queues.transport.wsgi import stats
from marconi.queues.transport.wsgi import v1
_WSGI_OPTIONS = [ _WSGI_OPTIONS = [
cfg.StrOpt('bind', default='127.0.0.1', cfg.StrOpt('bind', default='127.0.0.1',
@ -48,15 +43,17 @@ _WSGI_GROUP = 'queues:drivers:transport:wsgi'
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class Driver(transport.DriverBase): @six.add_metaclass(abc.ABCMeta)
class DriverBase(transport.DriverBase):
def __init__(self, conf, storage): def __init__(self, conf, storage, cache):
super(Driver, self).__init__(conf, storage) super(DriverBase, self).__init__(conf, storage, cache)
self._conf.register_opts(_WSGI_OPTIONS, group=_WSGI_GROUP) self._conf.register_opts(_WSGI_OPTIONS, group=_WSGI_GROUP)
self._wsgi_conf = self._conf[_WSGI_GROUP] self._wsgi_conf = self._conf[_WSGI_GROUP]
self._validate = validation.Validator(self._conf) self._validate = validation.Validator(self._conf)
self.app = None
self._init_routes() self._init_routes()
self._init_middleware() self._init_middleware()
@ -72,57 +69,9 @@ class Driver(transport.DriverBase):
] ]
self.app = falcon.API(before=before_hooks) self.app = falcon.API(before=before_hooks)
version_path = version.path()
queue_controller = self._storage.queue_controller for route, resource in self.bridge:
message_controller = self._storage.message_controller self.app.add_route(version_path + route, resource)
claim_controller = self._storage.claim_controller
# Home
self.app.add_route('/v1', v1.V1Resource())
# Queues Endpoints
queue_collection = queues.CollectionResource(self._validate,
queue_controller)
self.app.add_route('/v1/queues', queue_collection)
queue_item = queues.ItemResource(queue_controller, message_controller)
self.app.add_route('/v1/queues/{queue_name}', queue_item)
stats_endpoint = stats.Resource(queue_controller)
self.app.add_route('/v1/queues/{queue_name}'
'/stats', stats_endpoint)
# Metadata Endpoints
metadata_endpoint = metadata.Resource(self._wsgi_conf, self._validate,
queue_controller)
self.app.add_route('/v1/queues/{queue_name}'
'/metadata', metadata_endpoint)
# Messages Endpoints
msg_collection = messages.CollectionResource(self._wsgi_conf,
self._validate,
message_controller)
self.app.add_route('/v1/queues/{queue_name}'
'/messages', msg_collection)
msg_item = messages.ItemResource(message_controller)
self.app.add_route('/v1/queues/{queue_name}'
'/messages/{message_id}', msg_item)
# Claims Endpoints
claim_collection = claims.CollectionResource(self._wsgi_conf,
self._validate,
claim_controller)
self.app.add_route('/v1/queues/{queue_name}'
'/claims', claim_collection)
claim_item = claims.ItemResource(self._wsgi_conf, self._validate,
claim_controller)
self.app.add_route('/v1/queues/{queue_name}'
'/claims/{claim_id}', claim_item)
# Health
self.app.add_route('/v1/health', health.HealthResource())
def _init_middleware(self): def _init_middleware(self):
"""Initialize WSGI middlewarez.""" """Initialize WSGI middlewarez."""
@ -132,6 +81,17 @@ class Driver(transport.DriverBase):
strategy = auth.strategy(self._conf.auth_strategy) strategy = auth.strategy(self._conf.auth_strategy)
self.app = strategy.install(self.app, self._conf) self.app = strategy.install(self.app, self._conf)
@abc.abstractproperty
def bridge(self):
"""Constructs a list of route/responder pairs that can be used to
establish the functionality of this driver.
Note: the routes should be unversioned.
:rtype: [(str, falcon-compatible responser)]
"""
raise NotImplementedError
def listen(self): def listen(self):
"""Self-host using 'bind' and 'port' from the WSGI config group.""" """Self-host using 'bind' and 'port' from the WSGI config group."""

View File

@ -0,0 +1,75 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""marconi-queues (public): handles all the routes for queuing,
messaging, and claiming.
"""
from marconi.common.transport.wsgi import health
from marconi.queues.transport.wsgi import (
claims, driver, messages, metadata, queues, stats, v1,
)
class Driver(driver.DriverBase):
def __init__(self, conf, storage, cache):
super(Driver, self).__init__(conf, storage, cache)
@property
def bridge(self):
queue_controller = self._storage.queue_controller
message_controller = self._storage.message_controller
claim_controller = self._storage.claim_controller
return [
# Home
('/',
v1.V1Resource()),
# Queues Endpoints
('/queues',
queues.CollectionResource(self._validate,
queue_controller)),
('/queues/{queue_name}',
queues.ItemResource(queue_controller,
message_controller)),
('/queues/{queue_name}/stats',
stats.Resource(queue_controller)),
('/queues/{queue_name}/metadata',
metadata.Resource(self._wsgi_conf, self._validate,
queue_controller)),
# Messages Endpoints
('/queues/{queue_name}/messages',
messages.CollectionResource(self._wsgi_conf,
self._validate,
message_controller)),
('/queues/{queue_name}/messages/{message_id}',
messages.ItemResource(message_controller)),
# Claims Endpoints
('/queues/{queue_name}/claims',
claims.CollectionResource(self._wsgi_conf,
self._validate,
claim_controller)),
('/queues/{queue_name}/claims/{claim_id}',
claims.ItemResource(self._wsgi_conf,
self._validate,
claim_controller)),
# Health
('/health',
health.Resource())
]

View File

@ -30,8 +30,11 @@ marconi.queues.storage =
sqlite = marconi.queues.storage.sqlite.driver:Driver sqlite = marconi.queues.storage.sqlite.driver:Driver
mongodb = marconi.queues.storage.mongodb.driver:Driver mongodb = marconi.queues.storage.mongodb.driver:Driver
marconi.queues.transport = marconi.queues.public.transport =
wsgi = marconi.queues.transport.wsgi.driver:Driver wsgi = marconi.queues.transport.wsgi.public.driver:Driver
marconi.queues.admin.transport =
wsgi = marconi.queues.transport.wsgi.admin.driver:Driver
marconi.common.cache.backends = marconi.common.cache.backends =
memory = marconi.common.cache._backends.memory:MemoryBackend memory = marconi.common.cache._backends.memory:MemoryBackend

View File

@ -16,10 +16,9 @@
from falcon import testing as ftest from falcon import testing as ftest
from marconi.common.transport import version
from marconi.proxy.admin import bootstrap as admin from marconi.proxy.admin import bootstrap as admin
from marconi.proxy.transport.wsgi import ( from marconi.proxy.transport.wsgi import queues
queues, version
)
from marconi.proxy.utils import round_robin from marconi.proxy.utils import round_robin
from tests.unit.queues.transport.wsgi import base from tests.unit.queues.transport.wsgi import base

View File

@ -17,7 +17,7 @@
from falcon import testing as ftest from falcon import testing as ftest
from oslo.config import cfg from oslo.config import cfg
import marconi.queues from marconi.queues import bootstrap
from marconi.queues.transport.wsgi import driver from marconi.queues.transport.wsgi import driver
from marconi import tests as testing from marconi import tests as testing
from marconi.tests import faulty_storage from marconi.tests import faulty_storage
@ -39,7 +39,7 @@ class TestBase(testing.TestBase):
group=driver._WSGI_GROUP) group=driver._WSGI_GROUP)
self.wsgi_cfg = conf[driver._WSGI_GROUP] self.wsgi_cfg = conf[driver._WSGI_GROUP]
self.boot = marconi.Bootstrap(conf) self.boot = bootstrap.Bootstrap(conf)
self.app = self.boot.transport.app self.app = self.boot.transport.app
self.srmock = ftest.StartResponseMock() self.srmock = ftest.StartResponseMock()
@ -100,11 +100,11 @@ class TestBaseFaulty(TestBase):
"""This test ensures we aren't letting any exceptions go unhandled.""" """This test ensures we aren't letting any exceptions go unhandled."""
def setUp(self): def setUp(self):
self._storage_backup = marconi.Bootstrap.storage self._storage_backup = bootstrap.Bootstrap.storage
faulty = faulty_storage.Driver(cfg.ConfigOpts()) faulty = faulty_storage.Driver(cfg.ConfigOpts())
setattr(marconi.Bootstrap, 'storage', faulty) setattr(bootstrap.Bootstrap, 'storage', faulty)
super(TestBaseFaulty, self).setUp() super(TestBaseFaulty, self).setUp()
def tearDown(self): def tearDown(self):
setattr(marconi.Bootstrap, 'storage', self._storage_backup) setattr(bootstrap.Bootstrap, 'storage', self._storage_backup)
super(TestBaseFaulty, self).tearDown() super(TestBaseFaulty, self).tearDown()

View File

@ -14,7 +14,7 @@
# limitations under the License. # limitations under the License.
from marconi.common import exceptions from marconi.common import exceptions
import marconi.queues from marconi.queues import bootstrap
from marconi.queues.storage import pipeline from marconi.queues.storage import pipeline
from marconi.queues.storage import sharding from marconi.queues.storage import sharding
from marconi.queues.storage import sqlite from marconi.queues.storage import sqlite
@ -26,12 +26,12 @@ class TestBootstrap(base.TestBase):
def _bootstrap(self, conf_file): def _bootstrap(self, conf_file):
conf = self.load_conf(conf_file) conf = self.load_conf(conf_file)
return marconi.Bootstrap(conf) return bootstrap.Bootstrap(conf)
def test_storage_invalid(self): def test_storage_invalid(self):
bootstrap = self._bootstrap('etc/drivers_storage_invalid.conf') boot = self._bootstrap('etc/drivers_storage_invalid.conf')
self.assertRaises(exceptions.InvalidDriver, self.assertRaises(exceptions.InvalidDriver,
lambda: bootstrap.storage) lambda: boot.storage)
def test_storage_sqlite(self): def test_storage_sqlite(self):
bootstrap = self._bootstrap('etc/wsgi_sqlite.conf') bootstrap = self._bootstrap('etc/wsgi_sqlite.conf')
@ -44,9 +44,9 @@ class TestBootstrap(base.TestBase):
self.assertIsInstance(bootstrap.storage._storage, sharding.Driver) self.assertIsInstance(bootstrap.storage._storage, sharding.Driver)
def test_transport_invalid(self): def test_transport_invalid(self):
bootstrap = self._bootstrap('etc/drivers_transport_invalid.conf') boot = self._bootstrap('etc/drivers_transport_invalid.conf')
self.assertRaises(exceptions.InvalidDriver, self.assertRaises(exceptions.InvalidDriver,
lambda: bootstrap.transport) lambda: boot.transport)
def test_transport_wsgi(self): def test_transport_wsgi(self):
bootstrap = self._bootstrap('etc/wsgi_sqlite.conf') bootstrap = self._bootstrap('etc/wsgi_sqlite.conf')