proxy: memory storage driver + tests + fixes

Adds a memory storage driver to marconi-proxy. This is ideal for
testing storage implementations.

Furthermore, unit tests are added for the storage layer. These tests
check the fundamental behavior of the proxy storage drivers and
mirrors the approach used by marconi/queues/storage.

The storage interface has also been updated. The old variant keep
round robin indices in storage. I found that this was a Bad Idea. This
patch removes this design flaw from the
implementation. Consequentially, many of the helper functions were
greatly simplified. See: utils/node:weighted_select and
utils/round_robin:Selector.

The last set of changes relate to the transport layer. They are as
follows:
- redis has been torn out of the transport
    * storage is no longer hard-coded
- partition and catalogue controllers are passed as needed to
  resources
- resources round-robin per request, rather than only once on queue
  allocation
- terminology update: nodes -> hosts

Change-Id: I78863f7437dd9eec011cdfe7962fd88be23a3975
Implements: blueprint placement-service
This commit is contained in:
Alejandro Cabrera 2013-09-19 11:01:17 -04:00
parent e96cee8b97
commit 804c116084
22 changed files with 803 additions and 380 deletions

View File

@ -31,7 +31,6 @@ Running:
- gunicorn marconi.proxy.app:app
"""
import falcon
import redis
from marconi.proxy.resources import catalogue
from marconi.proxy.resources import forward
@ -41,41 +40,49 @@ from marconi.proxy.resources import partitions
from marconi.proxy.resources import queues
from marconi.proxy.resources import v1
# TODO(cpp-cabrera): migrate to oslo.config/stevedore
# to stop hard coding the driver
from marconi.proxy.storage.memory import driver as memory_driver
app = falcon.API()
client = redis.StrictRedis()
driver = memory_driver.Driver()
catalogue_driver = driver.catalogue_controller
partitions_driver = driver.partitions_controller
# TODO(cpp-cabrera): don't encode API version in routes -
# let's handle this elsewhere
# TODO(cpp-cabrera): bring in controllers based on config
# NOTE(cpp-cabrera): Proxy-specific routes
app.add_route('/v1/partitions',
partitions.Listing(client))
partitions.Listing(partitions_driver))
app.add_route('/v1/partitions/{partition}',
partitions.Resource(client))
partitions.Resource(partitions_driver))
app.add_route('/v1/catalogue',
catalogue.Listing(client))
catalogue.Listing(catalogue_driver))
app.add_route('/v1/catalogue/{queue}',
catalogue.Resource(client))
catalogue.Resource(catalogue_driver))
# NOTE(cpp-cabrera): queue handling routes
app.add_route('/v1/queues',
queues.Listing(client))
queues.Listing(catalogue_driver))
app.add_route('/v1/queues/{queue}',
queues.Resource(client))
queues.Resource(partitions_driver, catalogue_driver))
# NOTE(cpp-cabrera): Marconi forwarded routes
app.add_route('/v1',
v1.Resource(client))
v1.Resource(partitions_driver))
app.add_route('/v1/health',
health.Resource(client))
health.Resource())
app.add_route('/v1/queues/{queue}/claims',
forward.ClaimCreate(client))
forward.ClaimCreate(partitions_driver, catalogue_driver))
app.add_route('/v1/queues/{queue}/claims/{cid}',
forward.Claim(client))
forward.Claim(partitions_driver, catalogue_driver))
app.add_route('/v1/queues/{queue}/messages',
forward.MessageBulk(client))
forward.MessageBulk(partitions_driver, catalogue_driver))
app.add_route('/v1/queues/{queue}/messages/{mid}',
forward.Message(client))
forward.Message(partitions_driver, catalogue_driver))
app.add_route('/v1/queues/{queue}/stats',
forward.Stats(client))
forward.Stats(partitions_driver, catalogue_driver))
app.add_route('/v1/queues/{queue}/metadata',
metadata.Resource(client))
metadata.Resource(partitions_driver, catalogue_driver))

View File

@ -12,57 +12,27 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""catalogue: maintains a directory of all queues proxied through the system
Storage maintains an entry for each queue as follows:
{
q.{project}.{queue}: {'h': ByteString, 'n': ByteString, 'm': MsgPack}
}
"m" -> metadata
"n" -> name
"h" -> HTTP host
A list of all queues is also stored as:
{
qs.{project}: [{name}, {name}, {name}]
}
"""catalogue: maintains a directory of all queues proxied through the system.
"""
import json
import falcon
import msgpack
from marconi.proxy.storage import exceptions
from marconi.proxy.utils import helpers
class Listing(object):
"""A listing of all entries in the catalogue."""
def __init__(self, client):
self.client = client
def __init__(self, catalogue_controller):
self._catalogue = catalogue_controller
def on_get(self, request, response):
project = helpers.get_project(request)
key = 'qs.%s' % project
if not self.client.exists(key):
response.status = falcon.HTTP_204
return
resp = {}
for q in self.client.lrange(key, 0, -1):
hkey = 'q.%s.%s' % (project, q.decode('utf8'))
queue = q.decode('utf8')
h, n, m = self.client.hmget(hkey, ['h', 'n', 'm'])
if not all([h, n]):
continue
resp[queue] = {
'host': h.decode('utf8'),
'name': n.decode('utf8')
}
resp[queue]['metadata'] = msgpack.loads(m) if m else {}
for q in self._catalogue.list(project):
resp[q['name']] = q
if not resp:
response.status = falcon.HTTP_204
@ -74,19 +44,17 @@ class Listing(object):
class Resource(object):
"""A single catalogue entry."""
def __init__(self, client):
self.client = client
def __init__(self, catalogue_controller):
self._catalogue = catalogue_controller
def on_get(self, request, response, queue):
key = 'q.%s.%s' % (helpers.get_project(request), queue)
if not self.client.exists(key):
project = helpers.get_project(request)
entry = None
try:
entry = self._catalogue.get(project, queue)
except exceptions.EntryNotFound:
raise falcon.HTTPNotFound()
h, n, m = self.client.hmget(key, ['h', 'n', 'm'])
resp = {
'name': n.decode('utf8'),
'host': h.decode('utf8'),
}
resp['metadata'] = msgpack.loads(m) if m else {}
resp = entry
response.status = falcon.HTTP_200
response.body = json.dumps(resp, ensure_ascii=False)

View File

@ -15,96 +15,81 @@
"""forward: a resource for each marconi route where the desired result
is to just pass along a request to marconi.
"""
import falcon
from marconi.proxy.storage import exceptions
from marconi.proxy.utils import helpers
from marconi.proxy.utils import http
# TODO(cpp-cabrera): Replace these with falcon.set_default_route
# once that lands for DRYer forwarding
class ForwardMixin(object):
"""Implements falcon-compatible forwarding for resources."""
def __init__(self, partitions_controller, catalogue_controller,
methods):
"""Initializes a forwarding resource.
:param partitions_controller: talks to partitions storage
:param catalogue_controller: talks to catalogue storage
:param methods: [str] - allowed methods, e.g., ['get', 'post']
"""
self._catalogue = catalogue_controller
self._partitions = partitions_controller
for method in methods:
setattr(self, 'on_' + method, self.forward)
def forward(self, request, response, queue, **kwargs):
project = helpers.get_project(request)
# find the partition, round-robin the host
partition = None
try:
partition = self._catalogue.get(project, queue)['partition']
except exceptions.EntryNotFound:
raise falcon.HTTPNotFound()
host = self._partitions.select(partition)
# send the request, update the response
resp = helpers.forward(host, request)
response.status = http.status(resp.status_code)
response.body = resp.content
class ClaimCreate(object):
class ClaimCreate(ForwardMixin):
"""Handler for the endpoint to post claims."""
def __init__(self, client):
self.client = client
def on_post(self, request, response, queue):
resp = helpers.forward(self.client, request, queue)
response.set_headers(resp.headers)
response.status = http.status(resp.status_code)
response.body = resp.content
def __init__(self, partitions_controller, catalogue_controller):
super(ClaimCreate, self).__init__(
partitions_controller, catalogue_controller,
methods=['post'])
class Claim(object):
class Claim(ForwardMixin):
"""Handler for dealing with claims directly."""
def __init__(self, client):
self.client = client
def _forward_claim(self, request, response, queue):
resp = helpers.forward(self.client, request, queue)
response.set_headers(resp.headers)
response.status = http.status(resp.status_code)
response.body = resp.content
def on_patch(self, request, response, queue, cid):
self._forward_claim(request, response, queue)
def on_delete(self, request, response, queue, cid):
self._forward_claim(request, response, queue)
def on_get(self, request, response, queue, cid):
self._forward_claim(request, response, queue)
def __init__(self, partitions_controller, catalogue_controller):
super(Claim, self).__init__(
partitions_controller, catalogue_controller,
methods=['patch', 'delete', 'get'])
class MessageBulk(object):
class MessageBulk(ForwardMixin):
"""Handler for bulk message operations."""
def __init__(self, client):
self.client = client
def _forward_message(self, request, response, queue):
resp = helpers.forward(self.client, request, queue)
response.set_headers(resp.headers)
response.status = http.status(resp.status_code)
response.body = resp.content
def on_get(self, request, response, queue):
self._forward_message(request, response, queue)
def on_delete(self, request, response, queue):
self._forward_message(request, response, queue)
def on_post(self, request, response, queue):
self._forward_message(request, response, queue)
def __init__(self, partitions_controller, catalogue_controller):
super(MessageBulk, self).__init__(
partitions_controller, catalogue_controller,
methods=['get', 'delete', 'post'])
class Message(object):
class Message(ForwardMixin):
"""Handler for individual messages."""
def __init__(self, client):
self.client = client
def _forward_message(self, request, response, queue):
resp = helpers.forward(self.client, request, queue)
response.set_headers(resp.headers)
response.status = http.status(resp.status_code)
response.body = resp.content
def on_get(self, request, response, queue, mid):
self._forward_message(request, response, queue)
def on_delete(self, request, response, queue, mid):
self._forward_message(request, response, queue)
def __init__(self, partitions_controller, catalogue_controller):
super(Message, self).__init__(
partitions_controller, catalogue_controller,
methods=['get', 'delete'])
class Stats(object):
class Stats(ForwardMixin):
"""Handler for forwarding queue stats requests."""
def __init__(self, client):
self.client = client
def _forward_stats(self, request, response, queue):
resp = helpers.forward(self.client, request, queue)
response.set_headers(resp.headers)
response.status = http.status(resp.status_code)
response.body = resp.content
def on_get(self, request, response, queue):
self._forward_stats(request, response, queue)
def __init__(self, partitions_controller, catalogue_controller):
super(Stats, self).__init__(
partitions_controller, catalogue_controller,
methods=['get'])

View File

@ -12,21 +12,10 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""health: queries the first node in the first partition for health
responses.
"""
import requests
from marconi.proxy.utils import helpers
from marconi.proxy.utils import http
"""health: returns the health information for this proxy."""
import falcon
class Resource(object):
def __init__(self, client):
self.client = client
def on_get(self, request, response):
node = helpers.get_first_host(self.client)
resp = requests.get(node + '/v1/health')
response.status = http.status(resp.status_code)
response.body = resp.content
response.status = falcon.HTTP_204

View File

@ -15,39 +15,48 @@
"""metadata: adds queue metadata to the catalogue and forwards to
marconi queue metadata requests.
"""
import falcon
import msgpack
import requests
import io
import json
import falcon
from marconi.proxy.storage import exceptions
from marconi.proxy.utils import helpers
from marconi.proxy.utils import http
class Resource(object):
def __init__(self, client):
self.client = client
def __init__(self, partitions_controller, catalogue_controller):
self._partitions = partitions_controller
self._catalogue = catalogue_controller
def _make_key(self, request, queue):
def _forward(self, request, response, queue):
project = helpers.get_project(request)
return 'q.%s.%s' % (project, queue)
def on_get(self, request, response, queue):
resp = helpers.forward(self.client, request, queue)
response.status = http.status(resp.status_code)
response.body = resp.content
def on_put(self, request, response, queue):
key = self._make_key(request, queue)
if not self.client.exists(key):
partition = None
try:
partition = self._catalogue.get(project, queue)['partition']
except exceptions.EntryNotFound:
raise falcon.HTTPNotFound()
resp = helpers.forward(self.client, request, queue)
host = self._partitions.select(partition)
resp = helpers.forward(host, request)
response.status = http.status(resp.status_code)
response.body = resp.content
return resp
def on_get(self, request, response, queue):
self._forward(request, response, queue)
def on_put(self, request, response, queue):
project = helpers.get_project(request)
data = request.stream.read()
# NOTE(cpp-cabrera): This is a hack to preserve the metadata
request.stream = io.BytesIO(data)
resp = self._forward(request, response, queue)
if resp.ok:
project = helpers.get_project(request)
host = helpers.get_host_by_project_and_queue(self.client,
project, queue)
resp = requests.get(host + '/v1/queues/%s/metadata' % queue)
self.client.hset(key, 'm', msgpack.dumps(resp.json()))
self._catalogue.update_metadata(project, queue,
json.loads(data))

View File

@ -21,42 +21,35 @@ following fields are required:
{
"name": String,
"weight": Integer,
"nodes": [HTTP_EndPoints(:Port), ...]
}
In storage, a partition entry looks like:
{
"p.{name}": {"n": ByteString, "w": ByteString, "n": MsgPack}
}
Storage also maintains a list of partitions as:
{
"ps": [{name}, {name}, {name}, ...]
"hosts": [HTTP_EndPoints(:Port), ...]
}
"""
import json
import falcon
import msgpack
from marconi.proxy.storage import exceptions
class Listing(object):
"""A listing of all partitions registered."""
def __init__(self, client):
self.client = client
def __init__(self, partitions_controller):
self._ctrl = partitions_controller
def on_get(self, request, response):
partitions = self.client.lrange('ps', 0, -1)
"""Returns a partition listing as a JSON object:
{
"name": {"weight": 100, "hosts": [""]},
"..."
}
:returns: HTTP | [200, 204]
"""
resp = {}
for p in partitions:
key = 'p.%s' % p.decode('utf8')
n, w = self.client.hmget(key, ['n', 'w'])
if not all([n, w]):
continue
resp[p.decode('utf8')] = {'weight': int(w),
'nodes': [node.decode('utf8') for node
in msgpack.loads(n)]}
for p in self._ctrl.list():
resp[p['name']] = {'weight': int(p['weight']),
'hosts': p['hosts']}
if not resp:
response.status = falcon.HTTP_204
@ -68,19 +61,26 @@ class Listing(object):
class Resource(object):
"""A means to interact with individual partitions."""
def __init__(self, client):
self.client = client
def __init__(self, partitions_controller):
self._ctrl = partitions_controller
def on_get(self, request, response, partition):
n, w = self.client.hmget('p.%s' % partition, ['n', 'w'])
"""Returns a JSON object for a single partition entry:
if not all([n, w]): # ensure all the data was returned correctly
{"weight": 100, "hosts": [""]}
:returns: HTTP | [200, 404]
"""
data = None
try:
data = self._ctrl.get(partition)
except exceptions.PartitionNotFound:
raise falcon.HTTPNotFound()
nodes, weight = msgpack.loads(n), int(w)
hosts, weight = data['hosts'], data['weight']
response.body = json.dumps({
'nodes': [node.decode('utf8') for node in nodes],
'weight': weight,
'hosts': data['hosts'],
'weight': data['weight'],
}, ensure_ascii=False)
def _validate_put(self, data):
@ -89,23 +89,22 @@ class Resource(object):
'Invalid metadata', 'Define a partition as a dict'
)
if 'nodes' not in data:
if 'hosts' not in data:
raise falcon.HTTPBadRequest(
'Missing nodes list', 'Provide a list of nodes'
'Missing hosts list', 'Provide a list of hosts'
)
if not data['nodes']:
if not data['hosts']:
raise falcon.HTTPBadRequest(
'Empty nodes list', 'Nodes list cannot be empty'
'Empty hosts list', 'Hosts list cannot be empty'
)
if not isinstance(data['nodes'], list):
if not isinstance(data['hosts'], list):
raise falcon.HTTPBadRequest(
'Invalid nodes', 'Nodes must be a list of URLs'
'Invalid hosts', 'Hosts must be a list of URLs'
)
# TODO(cpp-cabrera): check [str]
if 'weight' not in data:
raise falcon.HTTPBadRequest(
'Missing weight',
@ -118,13 +117,18 @@ class Resource(object):
)
def on_put(self, request, response, partition):
"""Creates a new partition. Expects the following input:
{"weight": 100, "hosts": [""]}
:returns: HTTP | [201, 204]
"""
if partition.startswith('_'):
raise falcon.HTTPBadRequest(
'Reserved name', '_names are reserved for internal use'
)
key = 'p.%s' % partition
if self.client.exists(key):
if self._ctrl.exists(partition):
response.status = falcon.HTTP_204
return
@ -136,13 +140,15 @@ class Resource(object):
)
self._validate_put(data)
self.client.hmset(key, {'n': msgpack.dumps(data['nodes']),
'w': data['weight'],
'c': 0})
self.client.rpush('ps', partition)
self._ctrl.create(partition,
weight=data['weight'],
hosts=data['hosts'])
response.status = falcon.HTTP_201
def on_delete(self, request, response, partition):
self.client.delete('p.%s' % partition)
self.client.lrem('ps', 1, partition)
"""Removes an existing partition.
:returns: HTTP | 204
"""
self._ctrl.delete(partition)
response.status = falcon.HTTP_204

View File

@ -31,9 +31,8 @@ import collections
import json
import falcon
import msgpack
import requests
from marconi.proxy.storage import exceptions
from marconi.proxy.utils import helpers
from marconi.proxy.utils import http
from marconi.proxy.utils import node
@ -43,15 +42,14 @@ class Listing(object):
"""Responsible for constructing a valid marconi queue listing
from the content stored in the catalogue.
"""
def __init__(self, client):
self.client = client
def __init__(self, catalogue_controller):
self._catalogue = catalogue_controller
#TODO(cpp-cabrera): consider revisiting this implementation
# to use concurrent requests + merge/sort
# for great impl./data DRYness
def on_get(self, request, response):
project = helpers.get_project(request)
key = 'qs.%s' % project
if not self.client.exists(key):
response.status = falcon.HTTP_204
return
kwargs = {}
request.get_param('marker', store=kwargs)
@ -59,22 +57,19 @@ class Listing(object):
request.get_param_as_bool('detailed', store=kwargs)
resp = collections.defaultdict(list)
for q in sorted(self.client.lrange(key, 0, -1)):
queue = q.decode('utf8')
if queue < kwargs.get('marker', 0):
for q in self._catalogue.list(project):
queue = q['name']
if queue < kwargs.get('marker', ''):
continue
entry = {
'href': request.path + '/' + queue,
'name': queue
}
if kwargs.get('detailed', None):
qkey = 'q.%s.%s' % (project, queue)
data = self.client.hget(qkey, 'm')
metadata = msgpack.loads(data)
entry['metadata'] = metadata
entry['metadata'] = queue['metadata']
resp['queues'].append(entry)
kwargs['marker'] = queue
if len(resp['queues']) == kwargs.get('limit', None):
if len(resp['queues']) == kwargs.get('limit', 0):
break
if not resp:
@ -91,59 +86,59 @@ class Listing(object):
class Resource(object):
def __init__(self, client):
self.client = client
def __init__(self, partitions_controller, catalogue_controller):
self._partitions = partitions_controller
self._catalogue = catalogue_controller
def _make_key(self, request, queue):
project = helpers.get_project(request)
return 'q.%s.%s' % (project, queue)
def _rr(self, project, queue):
"""Returns the next host to use for a request."""
partition = None
try:
partition = self._catalogue.get(project, queue)['partition']
except exceptions.EntryNotFound:
raise falcon.HTTPNotFound()
return self._partitions.select(partition)
def on_get(self, request, response, queue):
key = self._make_key(request, queue)
if not self.client.exists(key):
raise falcon.HTTPNotFound()
project = helpers.get_project(request)
host = self._rr(project, queue)
resp = helpers.forward(host, request)
h, n = self.client.hmget(key, ['h', 'n'])
if not (h and n):
raise falcon.HTTPNotFound()
resp = helpers.forward(self.client, request, queue)
response.set_headers(resp.headers)
response.status = http.status(resp.status_code)
response.body = resp.content
def on_put(self, request, response, queue):
key = self._make_key(request, queue)
project = helpers.get_project(request)
if self.client.exists(key):
if self._catalogue.exists(project, queue):
response.status = falcon.HTTP_204
return
partition = node.weighted_select(self.client)
host = node.round_robin(self.client, partition)
url = '{host}/v1/queues/{queue}'.format(host=host, queue=queue)
resp = requests.put(url, headers=request._headers)
partition = node.weighted_select(self._partitions.list())
if partition is None:
raise falcon.HTTPBadRequest(
"No partitions registered",
"Register partitions before continuing"
)
host = partition['hosts'][0]
resp = helpers.forward(host, request)
# NOTE(cpp-cabrera): only catalogue a queue if a request is good
if resp.ok:
self.client.hmset(key, {
'h': host,
'n': queue
})
self.client.rpush('qs.%s' % project, queue)
self._catalogue.insert(project, queue, partition['name'], host)
response.status = http.status(resp.status_code)
response.body = resp.content
def on_delete(self, request, response, queue):
key = self._make_key(request, queue)
project = helpers.get_project(request)
resp = helpers.forward(self.client, request, queue)
response.set_headers(resp.headers)
response.status = http.status(resp.status_code)
host = self._rr(project, queue)
resp = helpers.forward(host, request)
# avoid deleting a queue if the request is bad
if not resp.ok:
self.client.hdel(key, queue)
self.client.lrem('qs.%s' % project, 1, queue)
if resp.ok:
self._catalogue.delete(project, queue)
response.set_headers(resp.headers)
response.status = http.status(resp.status_code)

View File

@ -13,18 +13,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""v1: queries the first node in the first partition for a homedoc."""
import requests
import falcon
from marconi.proxy.utils import helpers
from marconi.proxy.utils import http
class Resource(object):
def __init__(self, client):
self.client = client
def __init__(self, partitions_controller):
self._partitions = partitions_controller
def on_get(self, request, response):
node = helpers.get_first_host(self.client)
resp = requests.get(node + '/v1')
partition = None
try:
partition = next(self._partitions.list())
except StopIteration:
raise falcon.HTTPServiceUnavailable(
"No partitions found",
"Register some partitions",
retry_after=120
)
host = partition['hosts'][0]
resp = helpers.forward(host, request)
response.status = http.status(resp.status_code)
response.body = resp.content

View File

@ -1,6 +1,7 @@
"""Marconi proxy storage drivers"""
from marconi.proxy.storage import base
from marconi.proxy.storage import exceptions # NOQA
# NOTE(cpp-cabrera): Hoist classes into package namespace

View File

@ -59,15 +59,12 @@ class PartitionsBase(ControllerBase):
raise NotImplementedError
@abc.abstractmethod
def select(self):
"""Selects a node from one of the registered partitions. First,
a partition is selected by taking into account the partition weights.
Then, from the selected partition, the node at the current round robin
index is selected. Finally, the node URL is returned after incrementing
the round robin index.
def select(self, name):
"""Selects a node from the given partition. Any algorithm may be
used, though storage drivers are encouraged to use the
provided :see RoundRobin: implementation.
:returns: A node URL
:raises: NoPartitionsRegistered
:param name: str - The name of a registered partition.
"""
raise NotImplementedError
@ -80,6 +77,14 @@ class PartitionsBase(ControllerBase):
"""
raise NotImplementedError
@abc.abstractmethod
def exists(self, name):
"""Determines whether the given partition exists
:param name: str - Partition name to check for
:return: True | False
"""
@abc.abstractmethod
def create(self, name, weight, nodes):
"""Registers a new partition.
@ -113,16 +118,12 @@ class CatalogueBase(ControllerBase):
"""
@abc.abstractmethod
def list(self, project, include_metadata=False, include_location=False):
def list(self, project):
"""Returns a list of queue entries from the catalogue associated with
this project.
:param project: The project to use when filtering through queue
entries.
:param include_metadata: should the returned list include queue
metadata?
:param include_location: should the returned list include queue
location URLs?
:returns: a list of dicts: [{'name': ., 'location': ., 'metadata': .}]
"""
raise NotImplementedError
@ -140,12 +141,21 @@ class CatalogueBase(ControllerBase):
raise NotImplementedError
@abc.abstractmethod
def insert(self, project, queue, location, metadata={}):
def exists(self, project, queue):
"""Determines whether the given queue exists under project.
:param project: str - Namespace to check.
:param queue: str - Particular queue to check for
:return: True | False
"""
@abc.abstractmethod
def insert(self, project, queue, partition, metadata={}):
"""Creates a new catalogue entry.
:param project: Namespace to insert the given queue into
:param queue: The name of the queue to insert
:param location: The URL of the node where this queue is being stored
:param project: str - Namespace to insert the given queue into
:param queue: str - The name of the queue to insert
:param partition: str - Partition name where this queue is stored
:param metadata: A dictionary of metadata for this queue
"""
raise NotImplementedError
@ -159,15 +169,6 @@ class CatalogueBase(ControllerBase):
"""
raise NotImplementedError
@abc.abstractmethod
def location(self, project, queue):
"""Returns the location URL for this queue.
:param project: The namespace to search for this queue
:param queue: The name of the queue
"""
raise NotImplementedError
@abc.abstractmethod
def update_metadata(self, project, queue, metadata):
"""Updates the metadata associated with this queue.
@ -177,13 +178,3 @@ class CatalogueBase(ControllerBase):
:param metadata: A dictionary of metadata for this queue
"""
raise NotImplementedError
@abc.abstractmethod
def move(self, project, queue, location):
"""Changes the location for this queue.
:param project: Namespace to search
:param queue: The name of the queue
:param location: The URL of the node where this queue will be stored
"""
raise NotImplementedError

View File

@ -39,13 +39,3 @@ class PartitionNotFound(NotFound):
name=name
)
super(PartitionNotFound, self).__init__(msg)
class NoPartitionsRegistered(Exception):
"""An exception that is raised when attempting to select a node from
all registered partitions, except that no partitions are
registered.
"""
def __init__(self):
msg = 'No partitions are registered.'
super(NoPartitionsRegistered, self).__init__(msg)

View File

@ -0,0 +1,20 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Memory Proxy Storage Driver for Marconi"""
from marconi.proxy.storage.memory import driver
# Hoist classes into package namespace
Driver = driver.Driver

View File

@ -0,0 +1,74 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from marconi.proxy.storage import base
from marconi.proxy.storage import exceptions
def _idx(project, queue):
return project + '.' + queue
class CatalogueController(base.CatalogueBase):
def __init__(self, *args, **kwargs):
super(CatalogueController, self).__init__(*args, **kwargs)
self.driver.db['catalogue'] = {}
self._col = self.driver.db['catalogue']
def list(self, project):
for entry in sorted(self._col.values(), key=lambda x: x['q']):
yield _normalize(entry)
def get(self, project, queue):
entry = None
try:
entry = self._col[_idx(project, queue)]
except KeyError:
raise exceptions.EntryNotFound(project, queue)
return _normalize(entry)
def exists(self, project, queue):
return self._col.get(_idx(project, queue)) is not None
def insert(self, project, queue, partition, host, metadata={}):
self._col[_idx(project, queue)] = {
'p': project, 'q': queue,
'n': partition, 'h': host, 'm': metadata
}
def delete(self, project, queue):
try:
del self._col[_idx(project, queue)]
except KeyError:
pass
def update_metadata(self, project, queue, metadata):
try:
self._col[_idx(project, queue)]['m'] = metadata
except KeyError:
pass
def _normalize(entry):
return {
'name': six.text_type(entry['q']),
'metadata': entry['m'],
'partition': six.text_type(entry['n']),
'host': six.text_type(entry['h']),
}

View File

@ -0,0 +1,20 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi.proxy.storage.memory import catalogue
from marconi.proxy.storage.memory import partitions
CatalogueController = catalogue.CatalogueController
PartitionsController = partitions.PartitionsController

View File

@ -0,0 +1,34 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi.proxy.storage import base
from marconi.proxy.storage.memory import controllers
class Driver(base.DriverBase):
def __init__(self):
self._db = {}
@property
def db(self):
return self._db
@property
def partitions_controller(self):
return controllers.PartitionsController(self)
@property
def catalogue_controller(self):
return controllers.CatalogueController(self)

View File

@ -0,0 +1,67 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from marconi.proxy.storage import base
from marconi.proxy.storage import exceptions
from marconi.proxy.utils import round_robin
class PartitionsController(base.PartitionsBase):
def __init__(self, *args, **kwargs):
super(PartitionsController, self).__init__(*args, **kwargs)
self.driver.db['partitions'] = {}
self._col = self.driver.db['partitions']
self._rr = round_robin.Selector()
def list(self):
for entry in sorted(self._col.values(), key=lambda x: x['n']):
yield _normalize(entry)
def select(self, name):
partition = self.get(name)
return self._rr.next(partition['name'], partition['hosts'])
def get(self, name):
entry = None
try:
entry = self._col[name]
except KeyError:
raise exceptions.PartitionNotFound(name)
return _normalize(entry)
def exists(self, name):
return self._col.get(name) is not None
def create(self, name, weight, hosts):
self._col[name] = {'n': name,
'w': weight,
'h': hosts}
def delete(self, name):
try:
del self._col[name]
except KeyError:
pass
def _normalize(entry):
return {
'name': six.text_type(entry['n']),
'hosts': entry['h'],
'weight': entry['w']
}

View File

@ -13,34 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""helpers: utilities for performing common operations for resources."""
import falcon
import msgpack
import requests
def get_first_host(client):
"""Returns the first host from the first partition."""
try:
partition = next(p.decode('utf8') for p in
client.lrange('ps', 0, 0))
except StopIteration:
raise falcon.HTTPNotFound('No partitions registered')
key = 'p.%s' % partition
ns = msgpack.loads(client.hget(key, 'n'))
return next(n.decode('utf8') for n in ns)
def get_host_by_project_and_queue(client, project, queue):
"""Fetches the host address for a given project and queue.
:returns: a host address as stored or None if not found
"""
key = 'q.%s.%s' % (project, queue)
if not client.exists(key):
return None
return client.hget(key, 'h').decode('utf8')
def get_project(request):
"""Retrieves the Project-Id header from a request.
@ -49,17 +24,11 @@ def get_project(request):
return request.get_header('x_project_id') or '_'
def forward(client, request, queue):
"""Forwards a request to the appropriate host based on the location
of a given queue.
def forward(host, request):
"""Forwards a request.
:returns: a python-requests response object
:raises: falcon.HTTPNotFound if the queue cannot be found in the catalogue
"""
project = get_project(request)
host = get_host_by_project_and_queue(client, project, queue)
if not host:
raise falcon.HTTPNotFound()
url = host + request.path
if request.query_string:
url += '?' + request.query_string

View File

@ -15,49 +15,32 @@
"""node: utilities for implementing partition and node selections."""
import random
import msgpack
def weighted_select(client):
def weighted_select(partitions):
"""Select a partition from all the partitions registered using a weighted
selection algorithm.
:raises: RuntimeError if no partitions are registered
:param partitions: gen({'name': ..., 'weight': ..., 'hosts': ...}, ...)
:return: (name, hosts)
"""
acc = 0
lookup = []
# TODO(cpp-cabrera): the lookup table can be constructed once each time
# an entry is added/removed to/from the catalogue,
# an entry is added to/removed from the catalogue,
# rather than each time a queue is created.
# construct the (partition, weight) lookup table
for p in client.lrange('ps', 0, -1):
key = 'p.%s' % p.decode('utf8')
w = client.hget(key, 'w')
acc += int(w)
lookup.append((p.decode('utf8'), acc))
for p in partitions:
acc += p['weight']
lookup.append((p, acc))
if not lookup:
return None
# select a partition from the lookup table
selector = random.randint(0, acc - 1)
last = 0
for p, w in lookup:
weight = int(w)
if selector >= last and selector < weight:
if last <= selector < w:
return p
last = weight
raise RuntimeError('No partition could be selected - are any registered?')
def round_robin(client, partition):
"""Select a node in this partition and update the round robin index.
:returns: the address of a given node
:side-effect: updates the current index in the storage node for
this partition
"""
n, c = client.hmget('p.%s' % partition, ['n', 'c'])
nodes = [entry.decode('utf8') for entry in msgpack.loads(n)]
current = int(c)
client.hset('p.%s' % partition, 'c', (current + 1) % len(nodes))
return nodes[current]
last = w

View File

@ -0,0 +1,32 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""round_robin: Implements round-robin selection for partition hosts."""
import itertools
class Selector(object):
def __init__(self):
self._index = {}
def next(self, name, hosts):
"""Round robin selection of hosts
:param name: str - name to associate this list with
:param hosts: [a] - list of things to round robin. In the context
of Marconi, this is a list of URLs.
"""
if name not in self._index:
self._index[name] = itertools.cycle(hosts)
return next(self._index[name])

View File

@ -14,6 +14,9 @@
# limitations under the License.
import contextlib
import uuid
import six
@contextlib.contextmanager
@ -44,3 +47,76 @@ def expect(*exc_type):
else:
raise AssertionError(
'Not raised: %s' % ', '.join(e.__name__ for e in exc_type))
@contextlib.contextmanager
def partitions(controller, count):
"""context_manager: Creates `count` partitions in storage,
and deletes them once this goes out of scope.
:param partitions_controller:
:returns: [(str, int, [str])] - names, weights, hosts
"""
spec = [(six.text_type(uuid.uuid1()), i,
[six.text_type(i)]) for i in range(count)]
for n, w, h in spec:
controller.create(n, w, h)
yield spec
for n, _, _ in spec:
controller.delete(n)
@contextlib.contextmanager
def partition(controller, name, weight, hosts):
"""context_manager: Creates a single partition that is deleted
once this context manager goes out of scope.
:param controller: storage handler
:param name: str - partition name
:param weight: int - partition weight
:param hosts: [str] - hosts associated with this partition
:returns: (str, int, [str]) - name, weight, host used in construction
"""
controller.create(name, weight, hosts)
yield (name, weight, hosts)
controller.delete(name)
@contextlib.contextmanager
def entry(controller, project, queue, partition, host, metadata={}):
"""Creates a catalogue entry with the given details, and deletes
it once the context manager goes out of scope.
:param controller: storage handler
:param project: str - namespace for queue
:param queue: str - name of queue
:param partition: str - associated partition
:param host: str - representative host
:returns: (str, str, str, str, dict) - (project, queue, part, host, meta)
"""
controller.insert(project, queue, partition, host, metadata)
yield (project, queue, partition, host, metadata)
controller.delete(project, queue)
@contextlib.contextmanager
def entries(controller, count):
"""Creates `count` catalogue entries with the given details, and
deletes them once the context manager goes out of scope.
:param controller: storage handler
:returns: [(str, str, str, str)] - [(project, queue, partition, host)]
"""
spec = [(u'_', six.text_type(uuid.uuid1()), six.text_type(i),
six.text_type(i))
for i in range(count)]
for p, q, n, h in spec:
controller.insert(p, q, n, h)
yield spec
for p, q, _, _ in spec:
controller.delete(p, q)

View File

@ -12,9 +12,14 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
import six
from marconi.proxy import storage
from marconi.proxy.storage import exceptions
from marconi import tests as testing
from marconi.tests import helpers
class ControllerBaseTest(testing.TestBase):
@ -43,18 +48,161 @@ class PartitionsControllerTest(ControllerBaseTest):
def setUp(self):
super(PartitionsControllerTest, self).setUp()
self.partitions_controller = self.driver.partitions_controller
self.controller = self.driver.partitions_controller
self.name = six.text_type(uuid.uuid1())
def tearDown(self):
super(PartitionsControllerTest, self).tearDown()
def _check_structure(self, partition):
self.assertIn('name', partition)
self.assertIsInstance(partition['name'], six.text_type)
self.assertIn('hosts', partition)
self.assertIsInstance(partition['hosts'], list)
self.assertIsInstance(partition['hosts'][0], six.text_type)
self.assertIn('weight', partition)
self.assertIsInstance(partition['weight'], int)
def _check_values(self, partition, xname, xweight, xhosts):
self.assertEqual(partition['name'], xname)
self.assertEqual(partition['weight'], xweight)
self.assertEqual(partition['hosts'], xhosts)
def test_partition_life_cycle(self):
# check listing is initially empty
for p in self.controller.list():
self.fail('There should be no partitions at this time')
# create a listing, check its length
with helpers.partitions(self.controller, 10):
ps = list(self.controller.list())
self.assertEqual(len(ps), 10)
# create, check existence, delete
with helpers.partition(self.controller, self.name, 1, ['a']):
self.assertTrue(self.controller.exists(self.name))
# verify it no longer exists
self.assertFalse(self.controller.exists(self.name))
# verify it isn't listable
self.assertEqual(len(list(self.controller.list())), 0)
def test_list(self):
with helpers.partitions(self.controller, 10) as expect:
values = zip(self.controller.list(), expect)
for p, x in values:
n, w, h = x
self._check_structure(p)
self._check_values(p, xname=n, xweight=w, xhosts=h)
def test_select(self):
name = self.name
with helpers.partition(self.controller, name, 10, ['a', 'b', 'c']):
for i in range(3):
self.assertEqual(self.controller.select(name), 'a')
self.assertEqual(self.controller.select(name), 'b')
self.assertEqual(self.controller.select(name), 'c')
def test_select_raises_with_no_partitions(self):
self.assertRaises(exceptions.PartitionNotFound,
self.controller.select, ('not_found'))
def test_get(self):
name = self.name
with helpers.partition(self.controller, name, 10, ['a']) as expect:
p = self.controller.get(name)
n, w, h = expect
self._check_values(p, xname=n, xweight=w, xhosts=h)
def test_get_nonexistent_throws(self):
self.assertRaises(exceptions.PartitionNotFound,
self.controller.get, ('not_found'))
def test_exists(self):
name = self.name
with helpers.partition(self.controller, name, 10, ['a']):
self.assertTrue(self.controller.exists(name))
def test_create_overwrites(self):
name = self.name
with helpers.partition(self.controller, name, 1, ['a']):
with helpers.partition(self.controller, name, 2, ['b']) as p2:
fetched = self.controller.get(name)
n, w, h = p2
self._check_values(fetched, xname=n, xweight=w, xhosts=h)
class CatalogueControllerTest(ControllerBaseTest):
controller_base_class = storage.CatalogueBase
def setUp(self):
super(CatalogueControllerTest, self).setUp()
self.catalogue_controller = self.driver.catalogue_controller
self.controller = self.driver.catalogue_controller
self.queue = six.text_type(uuid.uuid1())
self.project = six.text_type(uuid.uuid1())
def tearDown(self):
super(CatalogueControllerTest, self).tearDown()
def _check_structure(self, entry):
self.assertIn('name', entry)
self.assertIn('metadata', entry)
self.assertIn('partition', entry)
self.assertIn('host', entry)
self.assertIsInstance(entry['name'], six.text_type)
self.assertIsInstance(entry['metadata'], dict)
self.assertIsInstance(entry['partition'], six.text_type)
self.assertIsInstance(entry['host'], six.text_type)
def _check_value(self, entry, xname, xmeta, xpartition, xhost):
self.assertEqual(entry['name'], xname)
self.assertEqual(entry['metadata'], xmeta)
self.assertEqual(entry['partition'], xpartition)
self.assertEqual(entry['host'], xhost)
def test_catalogue_entry_life_cycle(self):
queue = self.queue
project = self.project
# check listing is initially empty
for p in self.controller.list(project):
self.fail('There should be no entries at this time')
# create a listing, check its length
with helpers.entries(self.controller, 10):
xs = list(self.controller.list(u'_'))
self.assertEqual(len(xs), 10)
# create, check existence, delete
with helpers.entry(self.controller, project, queue, u'a', u'a'):
self.assertTrue(self.controller.exists(project, queue))
# verify it no longer exists
self.assertFalse(self.controller.exists(project, queue))
# verify it isn't listable
self.assertEqual(len(list(self.controller.list(project))), 0)
def test_list(self):
with helpers.entries(self.controller, 10) as expect:
values = zip(self.controller.list(u'_'), expect)
for e, x in values:
p, q, n, h = x
self._check_structure(e)
self._check_value(e, xname=q, xmeta={},
xpartition=n, xhost=h)
def test_get(self):
with helpers.entry(self.controller, self.project, self.queue,
u'a', u'a') as expect:
p, q, n, h, m = expect
e = self.controller.get(p, q)
self._check_value(e, xname=q, xmeta=m,
xpartition=n, xhost=h)
def test_exists(self):
with helpers.entry(self.controller, self.project, self.queue,
u'a', u'a') as expect:
p, q, _, _, _ = expect
self.assertTrue(self.controller.exists(p, q))

View File

@ -0,0 +1,49 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi.proxy.storage import memory
from marconi.proxy.storage.memory import controllers
from marconi import tests as testing
from tests.unit.proxy.storage import base
class MemoryDriverTest(testing.TestBase):
def setUp(self):
super(MemoryDriverTest, self).setUp()
class MemoryPartitionsTest(base.PartitionsControllerTest):
driver_class = memory.Driver
controller_class = controllers.PartitionsController
def setUp(self):
super(MemoryPartitionsTest, self).setUp()
def tearDown(self):
super(MemoryPartitionsTest, self).tearDown()
class MemoryCatalogueTest(base.CatalogueControllerTest):
driver_class = memory.Driver
controller_class = controllers.CatalogueController
def setUp(self):
super(MemoryCatalogueTest, self).setUp()
def tearDown(self):
super(MemoryCatalogueTest, self).tearDown()