proxy: adds oslo.cache, config support, & stevedore

This patch brings together oslo.cache, oslo.config, and stevedore to
provide pluggable, hierarchical catalogue caching for marconi proxy.

Here's the list of changes:
- add configuration/driver loading at the app level
- remove select from proxy storage driver - unnecessary intelligence
  at storage layer
- node.weighted_select -> partition.weighted_select (clearer name)
- forwarding logic further refactored, placed in own module
- caching logic placed in lookup module
- selector passed down at app level to handle round-robin state
  globally
    * open to becoming configurable
- adds several TODOs for a better proxy

Change-Id: I3bc568315e685486d63cdce3ec278c89e3f2b2bc
Implements: blueprint placement-service
This commit is contained in:
Alejandro Cabrera 2013-09-19 14:08:50 -04:00 committed by Gerrit Code Review
parent 8d4acc5f2e
commit ca26627ec4
14 changed files with 291 additions and 187 deletions

View File

@ -26,6 +26,10 @@ transport = wsgi
# Storage driver module (e.g., mongodb, sqlite)
storage = mongodb
# transport and storage drivers for use with marconi-proxy
[drivers:proxy]
storage = mongodb
[drivers:transport:wsgi]
;bind = 0.0.0.0
;port = 8888
@ -68,6 +72,10 @@ database = marconi
# files will be.
;gc_threshold = 1000
[drivers:proxy:storage:mongodb]
uri = mongodb://localhost:27017
database = marconi_proxy
[limits:transport]
# The maximum number of queue records per page when listing queues
;queue_paging_uplimit = 20
@ -90,3 +98,8 @@ database = marconi
;default_queue_paging = 10
# The default number of messages per page when listing or claiming messages
;default_message_paging = 10
# caching mechanism
[oslo_cache]
cache_backend = memory
;cache_prefix = my_prefix

View File

@ -19,18 +19,17 @@ Supports the following operator API:
- [PUT|GET|DELETE] /v1/partitions/{partition}
- [GET] /v1/catalogue
Deploy requirements:
- redis-server, default port
- gunicorn
- python >= 2.7
- falcon
- msgpack
- requests
Running:
- configure marconi.conf appropriately
- gunicorn marconi.proxy.app:app
"""
import falcon
from oslo.config import cfg
from stevedore import driver
from marconi.common.cache import cache
from marconi.common import config
from marconi.common import exceptions
from marconi.proxy.resources import catalogue
from marconi.proxy.resources import forward
@ -39,20 +38,41 @@ from marconi.proxy.resources import metadata
from marconi.proxy.resources import partitions
from marconi.proxy.resources import queues
from marconi.proxy.resources import v1
from marconi.proxy.utils import round_robin
# TODO(cpp-cabrera): migrate to oslo.config/stevedore
# to stop hard coding the driver
from marconi.proxy.storage.memory import driver as memory_driver
# TODO(cpp-cabrera): wrap all this up in a nice bootstrap.py
# TODO(cpp-cabrera): mirror marconi.queues.transport with this
# for nicer deployments (and eventual
# proxy multi-transport support!)
PROJECT_CFG = config.project('marconi')
CFG = config.namespace('drivers:proxy').from_options(
transport='wsgi',
storage='memory')
# TODO(cpp-cabrera): need to wrap this in a bootstrap class to defer
# loading of config until it is run in a WSGI
# context, otherwise, it breaks the test suite.
if __name__ == '__main__':
PROJECT_CFG.load()
app = falcon.API()
driver = memory_driver.Driver()
catalogue_driver = driver.catalogue_controller
partitions_driver = driver.partitions_controller
try:
storage = driver.DriverManager('marconi.proxy.storage',
CFG.storage,
invoke_on_load=True)
except RuntimeError as exc:
raise exceptions.InvalidDriver(exc)
catalogue_driver = storage.driver.catalogue_controller
partitions_driver = storage.driver.partitions_controller
cache_driver = cache.get_cache(cfg.CONF)
selector = round_robin.Selector()
# TODO(cpp-cabrera): don't encode API version in routes -
# let's handle this elsewhere
# TODO(cpp-cabrera): bring in controllers based on config
# NOTE(cpp-cabrera): Proxy-specific routes
app.add_route('/v1/partitions',
partitions.Listing(partitions_driver))
@ -62,27 +82,46 @@ app.add_route('/v1/catalogue',
catalogue.Listing(catalogue_driver))
app.add_route('/v1/catalogue/{queue}',
catalogue.Resource(catalogue_driver))
app.add_route('/v1/health',
health.Resource())
# NOTE(cpp-cabrera): queue handling routes
app.add_route('/v1/queues',
queues.Listing(catalogue_driver))
app.add_route('/v1/queues/{queue}',
queues.Resource(partitions_driver, catalogue_driver))
queues.Resource(partitions_driver, catalogue_driver,
cache_driver, selector))
# NOTE(cpp-cabrera): Marconi forwarded routes
app.add_route('/v1',
v1.Resource(partitions_driver))
app.add_route('/v1/health',
health.Resource())
# NOTE(cpp-cabrera): Marconi forwarded routes involving a queue
app.add_route('/v1/queues/{queue}/claims',
forward.ClaimCreate(partitions_driver, catalogue_driver))
forward.ClaimCreate(partitions_driver,
catalogue_driver,
cache_driver, selector))
app.add_route('/v1/queues/{queue}/claims/{cid}',
forward.Claim(partitions_driver, catalogue_driver))
forward.Claim(partitions_driver,
catalogue_driver,
cache_driver, selector))
app.add_route('/v1/queues/{queue}/messages',
forward.MessageBulk(partitions_driver, catalogue_driver))
forward.MessageBulk(partitions_driver,
catalogue_driver,
cache_driver, selector))
app.add_route('/v1/queues/{queue}/messages/{mid}',
forward.Message(partitions_driver, catalogue_driver))
forward.Message(partitions_driver,
catalogue_driver, cache_driver, selector))
app.add_route('/v1/queues/{queue}/stats',
forward.Stats(partitions_driver, catalogue_driver))
forward.Stats(partitions_driver,
catalogue_driver,
cache_driver, selector))
app.add_route('/v1/queues/{queue}/metadata',
metadata.Resource(partitions_driver, catalogue_driver))
metadata.Resource(partitions_driver,
catalogue_driver,
cache_driver, selector))

View File

@ -15,81 +15,49 @@
"""forward: a resource for each marconi route where the desired result
is to just pass along a request to marconi.
"""
import falcon
from marconi.proxy.storage import exceptions
from marconi.proxy.utils import helpers
from marconi.proxy.utils import http
from marconi.proxy.utils import forward
class ForwardMixin(object):
"""Implements falcon-compatible forwarding for resources."""
def __init__(self, partitions_controller, catalogue_controller,
methods):
"""Initializes a forwarding resource.
:param partitions_controller: talks to partitions storage
:param catalogue_controller: talks to catalogue storage
:param methods: [str] - allowed methods, e.g., ['get', 'post']
"""
self._catalogue = catalogue_controller
self._partitions = partitions_controller
for method in methods:
setattr(self, 'on_' + method, self.forward)
def forward(self, request, response, queue, **kwargs):
project = helpers.get_project(request)
# find the partition, round-robin the host
partition = None
try:
partition = self._catalogue.get(project, queue)['partition']
except exceptions.EntryNotFound:
raise falcon.HTTPNotFound()
host = self._partitions.select(partition)
# send the request, update the response
resp = helpers.forward(host, request)
response.status = http.status(resp.status_code)
response.body = resp.content
class ClaimCreate(ForwardMixin):
class ClaimCreate(forward.ForwardMixin):
"""Handler for the endpoint to post claims."""
def __init__(self, partitions_controller, catalogue_controller):
def __init__(self, partitions_controller, catalogue_controller,
cache, selector):
super(ClaimCreate, self).__init__(
partitions_controller, catalogue_controller,
methods=['post'])
partitions_controller, catalogue_controller, cache,
selector, methods=['post'])
class Claim(ForwardMixin):
class Claim(forward.ForwardMixin):
"""Handler for dealing with claims directly."""
def __init__(self, partitions_controller, catalogue_controller):
def __init__(self, partitions_controller, catalogue_controller,
cache, selector):
super(Claim, self).__init__(
partitions_controller, catalogue_controller,
methods=['patch', 'delete', 'get'])
partitions_controller, catalogue_controller, cache,
selector, methods=['patch', 'delete', 'get'])
class MessageBulk(ForwardMixin):
class MessageBulk(forward.ForwardMixin):
"""Handler for bulk message operations."""
def __init__(self, partitions_controller, catalogue_controller):
def __init__(self, partitions_controller, catalogue_controller,
cache, selector):
super(MessageBulk, self).__init__(
partitions_controller, catalogue_controller,
methods=['get', 'delete', 'post'])
partitions_controller, catalogue_controller, cache,
selector, methods=['get', 'delete', 'post'])
class Message(ForwardMixin):
class Message(forward.ForwardMixin):
"""Handler for individual messages."""
def __init__(self, partitions_controller, catalogue_controller):
def __init__(self, partitions_controller, catalogue_controller,
cache, selector):
super(Message, self).__init__(
partitions_controller, catalogue_controller,
methods=['get', 'delete'])
partitions_controller, catalogue_controller, cache,
selector, methods=['get', 'delete'])
class Stats(ForwardMixin):
class Stats(forward.ForwardMixin):
"""Handler for forwarding queue stats requests."""
def __init__(self, partitions_controller, catalogue_controller):
def __init__(self, partitions_controller, catalogue_controller,
cache, selector):
super(Stats, self).__init__(
partitions_controller, catalogue_controller,
methods=['get'])
partitions_controller, catalogue_controller, cache,
selector, methods=['get'])

View File

@ -18,36 +18,17 @@ marconi queue metadata requests.
import io
import json
import falcon
from marconi.proxy.storage import exceptions
from marconi.proxy.utils import forward
from marconi.proxy.utils import helpers
from marconi.proxy.utils import http
class Resource(object):
def __init__(self, partitions_controller, catalogue_controller):
self._partitions = partitions_controller
self._catalogue = catalogue_controller
def _forward(self, request, response, queue):
project = helpers.get_project(request)
partition = None
try:
partition = self._catalogue.get(project, queue)['partition']
except exceptions.EntryNotFound:
raise falcon.HTTPNotFound()
host = self._partitions.select(partition)
resp = helpers.forward(host, request)
response.status = http.status(resp.status_code)
response.body = resp.content
return resp
def on_get(self, request, response, queue):
self._forward(request, response, queue)
class Resource(forward.ForwardMixin):
def __init__(self, partitions_controller,
catalogue_controller,
cache, selector):
super(Resource, self).__init__(
partitions_controller, catalogue_controller,
cache, selector, methods=['get'])
def on_put(self, request, response, queue):
project = helpers.get_project(request)
@ -55,7 +36,7 @@ class Resource(object):
# NOTE(cpp-cabrera): This is a hack to preserve the metadata
request.stream = io.BytesIO(data)
resp = self._forward(request, response, queue)
resp = self.forward(request, response, queue)
if resp.ok:
self._catalogue.update_metadata(project, queue,

View File

@ -17,7 +17,7 @@
The queues resource performs routing to a marconi partition for
requests targeting queues.
For the case of a queue listing, the prooxy handles the request in its
For the case of a queue listing, the proxy handles the request in its
entirety, since queues for a given project may be spread across
multiple partitions. This requires the proxy catalogue being
consistent with the state of the entire deployment.
@ -32,10 +32,10 @@ import json
import falcon
from marconi.proxy.storage import exceptions
from marconi.proxy.utils import forward
from marconi.proxy.utils import helpers
from marconi.proxy.utils import http
from marconi.proxy.utils import node
from marconi.proxy.utils import partition
class Listing(object):
@ -85,60 +85,49 @@ class Listing(object):
response.body = json.dumps(resp, ensure_ascii=False)
class Resource(object):
def __init__(self, partitions_controller, catalogue_controller):
class Resource(forward.ForwardMixin):
def __init__(self, partitions_controller, catalogue_controller,
cache, selector):
self._partitions = partitions_controller
self._catalogue = catalogue_controller
def _rr(self, project, queue):
"""Returns the next host to use for a request."""
partition = None
try:
partition = self._catalogue.get(project, queue)['partition']
except exceptions.EntryNotFound:
raise falcon.HTTPNotFound()
return self._partitions.select(partition)
def on_get(self, request, response, queue):
project = helpers.get_project(request)
host = self._rr(project, queue)
resp = helpers.forward(host, request)
response.set_headers(resp.headers)
response.status = http.status(resp.status_code)
response.body = resp.content
super(Resource, self).__init__(partitions_controller,
catalogue_controller,
cache, selector,
methods=['get'])
def on_put(self, request, response, queue):
project = helpers.get_project(request)
if self._catalogue.exists(project, queue):
response.status = falcon.HTTP_204
return
"""Create a queue in the catalogue, then forwards to marconi.
partition = node.weighted_select(self._partitions.list())
if partition is None:
raise falcon.HTTPBadRequest(
This is the only time marconi proxy ever needs to select a
partition for a queue. The association is created in the
catalogue. This should also be the only time
partition.weighted_select is ever called.
:raises: InternalServerError - if no partitions are registered
"""
target = partition.weighted_select(self._partitions.list())
if target is None:
raise falcon.InternalServerError(
"No partitions registered",
"Register partitions before continuing"
"Contact the system administrator for more details."
)
host = partition['hosts'][0]
host = target['hosts'][0]
resp = helpers.forward(host, request)
# NOTE(cpp-cabrera): only catalogue a queue if a request is good
if resp.ok:
self._catalogue.insert(project, queue, partition['name'], host)
project = helpers.get_project(request)
self._catalogue.insert(project, queue, target['name'],
host)
response.status = http.status(resp.status_code)
response.body = resp.content
def on_delete(self, request, response, queue):
project = helpers.get_project(request)
host = self._rr(project, queue)
resp = helpers.forward(host, request)
resp = self.forward(request, response, queue)
# avoid deleting a queue if the request is bad
if resp.ok:
self._catalogue.delete(project, queue)
response.set_headers(resp.headers)
response.status = http.status(resp.status_code)

View File

@ -58,16 +58,6 @@ class PartitionsBase(ControllerBase):
"""
raise NotImplementedError
@abc.abstractmethod
def select(self, name):
"""Selects a node from the given partition. Any algorithm may be
used, though storage drivers are encouraged to use the
provided :see RoundRobin: implementation.
:param name: str - The name of a registered partition.
"""
raise NotImplementedError
@abc.abstractmethod
def get(self, name):
"""Retrieves the nodes and weight for a partition with this name.

View File

@ -16,7 +16,6 @@ import six
from marconi.proxy.storage import base
from marconi.proxy.storage import exceptions
from marconi.proxy.utils import round_robin
class PartitionsController(base.PartitionsBase):
@ -25,16 +24,11 @@ class PartitionsController(base.PartitionsBase):
self.driver.db['partitions'] = {}
self._col = self.driver.db['partitions']
self._rr = round_robin.Selector()
def list(self):
for entry in sorted(self._col.values(), key=lambda x: x['n']):
yield _normalize(entry)
def select(self, name):
partition = self.get(name)
return self._rr.next(partition['name'], partition['hosts'])
def get(self, name):
entry = None
try:

View File

@ -26,7 +26,6 @@ Schema:
from marconi.proxy.storage import base
from marconi.proxy.storage import exceptions
from marconi.proxy.utils import round_robin
from marconi.queues.storage.mongodb import utils
PARTITIONS_INDEX = [
@ -40,7 +39,6 @@ class PartitionsController(base.PartitionsBase):
self._col = self.driver.db['partitions']
self._col.ensure_index(PARTITIONS_INDEX, unique=True)
self._rr = round_robin.Selector()
@utils.raises_conn_error
def list(self):
@ -49,11 +47,6 @@ class PartitionsController(base.PartitionsBase):
for entry in cursor:
yield _normalize(entry)
@utils.raises_conn_error
def select(self, name):
partition = self.get(name)
return self._rr.next(partition['name'], partition['hosts'])
@utils.raises_conn_error
def get(self, name):
fields = {'n': 1, 'w': 1, 'h': 1, '_id': 0}

View File

@ -0,0 +1,76 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""forward: exposes a mixin class appropriate for forwarding requests."""
import falcon
from marconi.proxy.utils import helpers
from marconi.proxy.utils import http
from marconi.proxy.utils import lookup
class ForwardMixin(object):
"""Implements falcon-compatible forwarding for resources."""
def __init__(self, partitions_controller, catalogue_controller,
cache, selector, methods):
"""Initializes a forwarding resource.
:param partitions_controller: talks to partitions storage
:param catalogue_controller: talks to catalogue storage
:param cache: localized, fast lookups
:param selector: @see utils.round_robin - host selection order
:param methods: [text] - allowed methods, e.g., ['get', 'post']
"""
self._catalogue = catalogue_controller
self._partitions = partitions_controller
self._cache = cache
self._selector = selector
for method in methods:
assert method.lower() in http.METHODS
setattr(self, 'on_' + method.lower(), self.forward)
def forward(self, request, response, queue, **kwargs):
"""Forwards requests in a selector-driven fashion."""
project = helpers.get_project(request)
partition = lookup.partition(project, queue,
self._catalogue,
self._cache)
# NOTE(cpp-cabrera): we tried to look up a catalogue
# entry and it failed. This happens if the associated
# queue doesn't exist under that project.
if not partition:
raise falcon.HTTPNotFound()
hosts = lookup.hosts(partition, self._partitions, self._cache)
# NOTE(cpp-cabrera): we tried to look up a partition, and it
# failed. This only happens if a partition is deleted from
# the primary store between here and the last call.
if not hosts:
raise falcon.HTTPNotFound()
# round robin to choose the desired host
host = self._selector.next(partition, hosts)
# send the request, update the response
resp = helpers.forward(host, request)
response.status = http.status(resp.status_code)
response.body = resp.content
# NOTE(cpp-cabrera): in case responder must do more afterwards
return resp

View File

@ -16,6 +16,12 @@
import falcon
METHODS = (
'get', 'put', 'head', 'delete', 'post',
'patch', 'options'
)
_code_map = dict((int(v.split()[0]), v)
for k, v in falcon.status_codes.__dict__.items()
if k.startswith('HTTP_'))

View File

@ -0,0 +1,64 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""lookup: functions to handle caching/lookup of proxy details."""
import msgpack
from marconi.proxy.storage import exceptions
def partition(project, queue, catalogue_controller, cache):
"""Returns the name of the partition associated with this project.queue
:param project: text - the project namespace
:param queue: text - the name of the queue
:param catalogue_controller: primary storage for the queue catalogue
:param cache: cache for catalogue - updated if lookup fails
:returns: Maybe text - partition name or None if not found
"""
key = u'q.{project}.{queue}'.format(project=project, queue=queue)
name = cache.get(key)
if not name:
try:
name = catalogue_controller.get(project, queue)['partition']
except exceptions.EntryNotFound:
return None
cache.set(key, name)
return name
def hosts(name, partitions_controller, cache):
"""Returns the list of hosts associated with this partition.
:param name: text - the name of the partition to look up
:param partitions_controller: handler for primary storage
:param cache: cache to check first - updated if partition not found
:returns: Maybe [text] - list of hosts or None if not found
"""
key = u'p.{name}'.format(name=name)
data = cache.get(key)
hosts = None
if data:
hosts = msgpack.loads(data)
if not hosts:
try:
hosts = partitions_controller.get(name)['hosts']
except exceptions.PartitionNotFound:
return None
cache.set(key, msgpack.dumps(hosts))
return hosts

View File

@ -12,13 +12,12 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""node: utilities for implementing partition and node selections."""
"""partition: utilities for implementing partition selections."""
import random
def weighted_select(partitions):
"""Select a partition from all the partitions registered using a weighted
selection algorithm.
"""Select a partition from all the partitions passed in.
:param partitions: gen({'name': ..., 'weight': ..., 'hosts': ...}, ...)
:return: (name, hosts)

View File

@ -38,6 +38,10 @@ marconi.common.cache.backends =
memory = marconi.common.cache._backends.memory:MemoryBackend
memcached = marconi.common.cache._backends.memcached:MemcachedBackend
marconi.proxy.storage =
memory = marconi.proxy.storage.memory.driver:Driver
mongodb = marconi.proxy.storage.mongodb.driver:Driver
[nosetests]
where=tests
verbosity=2

View File

@ -96,18 +96,6 @@ class PartitionsControllerTest(ControllerBaseTest):
self._check_structure(p)
self._check_values(p, xname=n, xweight=w, xhosts=h)
def test_select(self):
name = self.name
with helpers.partition(self.controller, name, 10, ['a', 'b', 'c']):
for i in range(3):
self.assertEqual(self.controller.select(name), 'a')
self.assertEqual(self.controller.select(name), 'b')
self.assertEqual(self.controller.select(name), 'c')
def test_select_raises_with_no_partitions(self):
self.assertRaises(exceptions.PartitionNotFound,
self.controller.select, ('not_found'))
def test_get(self):
name = self.name
with helpers.partition(self.controller, name, 10, ['a']) as expect: