feat(shard): queue listing from multiple sources
Sharding need special storage driver wrapper for queue-related operations. The listing queue algorithm merges the listing result from multiple storage connections, and return them in sorted order. This patch also fixes non-configurable default limits in sharding. Change-Id: If9bd80459bd1a0dc92ea68c42bc28c0f6dde2a65 Partitally-implements: blueprint storage-sharding
This commit is contained in:
parent
b20519cb17
commit
25b5794bab
@ -120,8 +120,8 @@ class QueueBase(ControllerBase):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def list(self, project=None, marker=None, limit=10,
|
||||
detailed=False):
|
||||
def list(self, project=None, marker=None,
|
||||
limit=None, detailed=False):
|
||||
"""Base method for listing queues.
|
||||
|
||||
:param project: Project id
|
||||
@ -335,7 +335,7 @@ class ClaimBase(ControllerBase):
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def create(self, queue, metadata, project=None, limit=10):
|
||||
def create(self, queue, metadata, project=None, limit=None):
|
||||
"""Base method for creating a claim.
|
||||
|
||||
:param queue: Name of the queue this
|
||||
|
@ -14,6 +14,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import heapq
|
||||
import itertools
|
||||
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
|
||||
@ -105,11 +108,36 @@ class QueueController(RoutingController):
|
||||
self._lookup = self._shard_catalog.lookup
|
||||
|
||||
def list(self, project=None, marker=None,
|
||||
limit=10, detailed=False):
|
||||
# TODO(cpp-cabrera): fill in sharded list queues
|
||||
# implementation.
|
||||
limit=None, detailed=False):
|
||||
|
||||
return []
|
||||
def all_pages():
|
||||
for shard in self._shard_catalog._shards_ctrl.list(limit=0):
|
||||
yield next(self._shard_catalog.get_driver(shard['name'])
|
||||
.queue_controller.list(
|
||||
project=project,
|
||||
marker=marker,
|
||||
limit=limit,
|
||||
detailed=detailed))
|
||||
|
||||
# make a heap compared with 'name'
|
||||
ls = heapq.merge(*[
|
||||
utils.keyify('name', page)
|
||||
for page in all_pages()
|
||||
])
|
||||
|
||||
if limit is None:
|
||||
limit = self._shard_catalog._limits_conf.default_queue_paging
|
||||
|
||||
marker_name = {}
|
||||
|
||||
# limit the iterator and strip out the comparison wrapper
|
||||
def it():
|
||||
for queue_cmp in itertools.islice(ls, limit):
|
||||
marker_name['next'] = queue_cmp.obj['name']
|
||||
yield queue_cmp.obj
|
||||
|
||||
yield it()
|
||||
yield marker_name['next']
|
||||
|
||||
def create(self, name, project=None):
|
||||
self._shard_catalog.register(name, project)
|
||||
@ -211,8 +239,8 @@ class MessageController(RoutingController):
|
||||
message_ids=message_ids)
|
||||
return []
|
||||
|
||||
def list(self, queue, project, marker=None, limit=10, echo=False,
|
||||
client_uuid=None, include_claimed=False):
|
||||
def list(self, queue, project, marker=None, limit=None,
|
||||
echo=False, client_uuid=None, include_claimed=False):
|
||||
target = self._lookup(queue, project)
|
||||
if target:
|
||||
control = target.message_controller
|
||||
@ -280,6 +308,11 @@ class Catalog(object):
|
||||
|
||||
self._conf.register_opts(_CATALOG_OPTIONS, group=_CATALOG_GROUP)
|
||||
self._catalog_conf = self._conf[_CATALOG_GROUP]
|
||||
|
||||
self._conf.register_opts(storage.base._LIMITS_OPTIONS,
|
||||
group=storage.base._LIMITS_GROUP)
|
||||
self._limits_conf = self._conf[storage.base._LIMITS_GROUP]
|
||||
|
||||
self._shards_ctrl = control.shards_controller
|
||||
self._catalogue_ctrl = control.catalogue_controller
|
||||
|
||||
@ -379,10 +412,20 @@ class Catalog(object):
|
||||
except errors.QueueNotMapped:
|
||||
return None
|
||||
|
||||
return self.get_driver(shard_id)
|
||||
|
||||
def get_driver(self, shard_id):
|
||||
"""Get storage driver, preferabaly cached, fron a shard name.
|
||||
|
||||
:param shard_id: The name of a shard.
|
||||
:type shard_id: six.text_type
|
||||
:returns: a storage driver
|
||||
:rtype: marconi.queues.storage.base.DataDriver
|
||||
"""
|
||||
|
||||
# NOTE(cpp-cabrera): cache storage driver connection
|
||||
try:
|
||||
driver = self._drivers[shard_id]
|
||||
return self._drivers[shard_id]
|
||||
except KeyError:
|
||||
self._drivers[shard_id] = driver = self._init_driver(shard_id)
|
||||
|
||||
return driver
|
||||
self._drivers[shard_id] = self._init_driver(shard_id)
|
||||
return self._drivers[shard_id]
|
||||
|
@ -43,3 +43,24 @@ def load_storage_driver(conf, control_mode=False):
|
||||
except RuntimeError as exc:
|
||||
LOG.exception(exc)
|
||||
raise errors.InvalidDriver(exc)
|
||||
|
||||
|
||||
def keyify(key, iterable):
|
||||
"""Make an iterator from an iterable of dicts compared with a key.
|
||||
|
||||
:param key: A key exists for all dict inside the iterable object
|
||||
:param iterable: The input iterable object
|
||||
"""
|
||||
|
||||
class Keyed(object):
|
||||
def __init__(self, obj):
|
||||
self.obj = obj
|
||||
|
||||
def __cmp__(self, other):
|
||||
return cmp(self.obj[key], other.obj[key])
|
||||
|
||||
# TODO(zyuan): define magic operators to make py3 work
|
||||
# http://code.activestate.com/recipes/576653/
|
||||
|
||||
for item in iterable:
|
||||
yield Keyed(item)
|
||||
|
@ -91,7 +91,7 @@ class MessageController(storage.MessageBase):
|
||||
raise NotImplementedError()
|
||||
|
||||
def list(self, queue, project=None, marker=None,
|
||||
limit=10, echo=False, client_uuid=None):
|
||||
limit=None, echo=False, client_uuid=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def post(self, queue, messages, project=None):
|
||||
|
93
tests/unit/queues/storage/test_shard_queues.py
Normal file
93
tests/unit/queues/storage/test_shard_queues.py
Normal file
@ -0,0 +1,93 @@
|
||||
# Copyright (c) 2013 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
#
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import random
|
||||
import uuid
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from marconi.queues.storage import sharding
|
||||
from marconi.queues.storage import utils
|
||||
from marconi import tests as testing
|
||||
from marconi.tests import base
|
||||
|
||||
|
||||
class TestShardQueues(base.TestBase):
|
||||
|
||||
@testing.requires_mongodb
|
||||
def setUp(self):
|
||||
super(TestShardQueues, self).setUp()
|
||||
conf = self.load_conf('wsgi_mongodb_sharded.conf')
|
||||
|
||||
conf.register_opts([cfg.StrOpt('storage')],
|
||||
group='queues:drivers')
|
||||
|
||||
control = utils.load_storage_driver(conf, control_mode=True)
|
||||
self.shards_ctrl = control.shards_controller
|
||||
self.controller = sharding.DataDriver(conf, control).queue_controller
|
||||
|
||||
# fake two shards
|
||||
for _ in xrange(2):
|
||||
self.shards_ctrl.create(str(uuid.uuid1()), 100, 'sqlite://memory')
|
||||
|
||||
def tearDown(self):
|
||||
self.shards_ctrl.drop_all()
|
||||
super(TestShardQueues, self).tearDown()
|
||||
|
||||
def test_listing(self):
|
||||
project = "I.G"
|
||||
|
||||
interaction = self.controller.list(project=project,
|
||||
detailed=False)
|
||||
queues = list(next(interaction))
|
||||
|
||||
self.assertEqual(len(queues), 0)
|
||||
|
||||
for n in xrange(10):
|
||||
name = 'queue_%d' % n
|
||||
self.controller.create(name, project=project)
|
||||
self.controller.set_metadata(name,
|
||||
metadata=random.getrandbits(12),
|
||||
project=project)
|
||||
|
||||
interaction = self.controller.list(project=project,
|
||||
detailed=True,
|
||||
limit=7)
|
||||
queues.extend(next(interaction))
|
||||
marker = next(interaction)
|
||||
|
||||
self.assertEqual(len(queues), 7)
|
||||
|
||||
interaction = self.controller.list(project=project,
|
||||
detailed=True,
|
||||
limit=7,
|
||||
marker=marker)
|
||||
queues.extend(next(interaction))
|
||||
|
||||
self.assertEqual(len(queues), 10)
|
||||
|
||||
# ordered by name as a whole
|
||||
self.assertTrue(all(queues[i]['name'] <= queues[i + 1]['name']
|
||||
for i in xrange(len(queues) - 1)))
|
||||
|
||||
for n in xrange(10):
|
||||
self.controller.delete('queue_%d' % n, project=project)
|
||||
|
||||
interaction = self.controller.list(project=project,
|
||||
detailed=False)
|
||||
queues = list(next(interaction))
|
||||
|
||||
self.assertEqual(len(queues), 0)
|
Loading…
x
Reference in New Issue
Block a user