From 25b5794babbd854d5bcd9a2c01fc82f96e8ca4a6 Mon Sep 17 00:00:00 2001 From: Zhihao Yuan Date: Tue, 22 Oct 2013 10:18:50 -0400 Subject: [PATCH] feat(shard): queue listing from multiple sources Sharding need special storage driver wrapper for queue-related operations. The listing queue algorithm merges the listing result from multiple storage connections, and return them in sorted order. This patch also fixes non-configurable default limits in sharding. Change-Id: If9bd80459bd1a0dc92ea68c42bc28c0f6dde2a65 Partitally-implements: blueprint storage-sharding --- marconi/queues/storage/base.py | 6 +- marconi/queues/storage/sharding.py | 63 +++++++++++-- marconi/queues/storage/utils.py | 21 +++++ marconi/tests/faulty_storage.py | 2 +- .../unit/queues/storage/test_shard_queues.py | 93 +++++++++++++++++++ 5 files changed, 171 insertions(+), 14 deletions(-) create mode 100644 tests/unit/queues/storage/test_shard_queues.py diff --git a/marconi/queues/storage/base.py b/marconi/queues/storage/base.py index 7fccc69da..a1cdda7f7 100644 --- a/marconi/queues/storage/base.py +++ b/marconi/queues/storage/base.py @@ -120,8 +120,8 @@ class QueueBase(ControllerBase): """ @abc.abstractmethod - def list(self, project=None, marker=None, limit=10, - detailed=False): + def list(self, project=None, marker=None, + limit=None, detailed=False): """Base method for listing queues. :param project: Project id @@ -335,7 +335,7 @@ class ClaimBase(ControllerBase): raise NotImplementedError @abc.abstractmethod - def create(self, queue, metadata, project=None, limit=10): + def create(self, queue, metadata, project=None, limit=None): """Base method for creating a claim. :param queue: Name of the queue this diff --git a/marconi/queues/storage/sharding.py b/marconi/queues/storage/sharding.py index eb90ad35f..258d23304 100644 --- a/marconi/queues/storage/sharding.py +++ b/marconi/queues/storage/sharding.py @@ -14,6 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import heapq +import itertools + from oslo.config import cfg import six @@ -105,11 +108,36 @@ class QueueController(RoutingController): self._lookup = self._shard_catalog.lookup def list(self, project=None, marker=None, - limit=10, detailed=False): - # TODO(cpp-cabrera): fill in sharded list queues - # implementation. + limit=None, detailed=False): - return [] + def all_pages(): + for shard in self._shard_catalog._shards_ctrl.list(limit=0): + yield next(self._shard_catalog.get_driver(shard['name']) + .queue_controller.list( + project=project, + marker=marker, + limit=limit, + detailed=detailed)) + + # make a heap compared with 'name' + ls = heapq.merge(*[ + utils.keyify('name', page) + for page in all_pages() + ]) + + if limit is None: + limit = self._shard_catalog._limits_conf.default_queue_paging + + marker_name = {} + + # limit the iterator and strip out the comparison wrapper + def it(): + for queue_cmp in itertools.islice(ls, limit): + marker_name['next'] = queue_cmp.obj['name'] + yield queue_cmp.obj + + yield it() + yield marker_name['next'] def create(self, name, project=None): self._shard_catalog.register(name, project) @@ -211,8 +239,8 @@ class MessageController(RoutingController): message_ids=message_ids) return [] - def list(self, queue, project, marker=None, limit=10, echo=False, - client_uuid=None, include_claimed=False): + def list(self, queue, project, marker=None, limit=None, + echo=False, client_uuid=None, include_claimed=False): target = self._lookup(queue, project) if target: control = target.message_controller @@ -280,6 +308,11 @@ class Catalog(object): self._conf.register_opts(_CATALOG_OPTIONS, group=_CATALOG_GROUP) self._catalog_conf = self._conf[_CATALOG_GROUP] + + self._conf.register_opts(storage.base._LIMITS_OPTIONS, + group=storage.base._LIMITS_GROUP) + self._limits_conf = self._conf[storage.base._LIMITS_GROUP] + self._shards_ctrl = control.shards_controller self._catalogue_ctrl = control.catalogue_controller @@ -379,10 +412,20 @@ class Catalog(object): except errors.QueueNotMapped: return None + return self.get_driver(shard_id) + + def get_driver(self, shard_id): + """Get storage driver, preferabaly cached, fron a shard name. + + :param shard_id: The name of a shard. + :type shard_id: six.text_type + :returns: a storage driver + :rtype: marconi.queues.storage.base.DataDriver + """ + # NOTE(cpp-cabrera): cache storage driver connection try: - driver = self._drivers[shard_id] + return self._drivers[shard_id] except KeyError: - self._drivers[shard_id] = driver = self._init_driver(shard_id) - - return driver + self._drivers[shard_id] = self._init_driver(shard_id) + return self._drivers[shard_id] diff --git a/marconi/queues/storage/utils.py b/marconi/queues/storage/utils.py index 298d9820b..e8661eb5d 100644 --- a/marconi/queues/storage/utils.py +++ b/marconi/queues/storage/utils.py @@ -43,3 +43,24 @@ def load_storage_driver(conf, control_mode=False): except RuntimeError as exc: LOG.exception(exc) raise errors.InvalidDriver(exc) + + +def keyify(key, iterable): + """Make an iterator from an iterable of dicts compared with a key. + + :param key: A key exists for all dict inside the iterable object + :param iterable: The input iterable object + """ + + class Keyed(object): + def __init__(self, obj): + self.obj = obj + + def __cmp__(self, other): + return cmp(self.obj[key], other.obj[key]) + + # TODO(zyuan): define magic operators to make py3 work + # http://code.activestate.com/recipes/576653/ + + for item in iterable: + yield Keyed(item) diff --git a/marconi/tests/faulty_storage.py b/marconi/tests/faulty_storage.py index f769b19f8..3b265969b 100644 --- a/marconi/tests/faulty_storage.py +++ b/marconi/tests/faulty_storage.py @@ -91,7 +91,7 @@ class MessageController(storage.MessageBase): raise NotImplementedError() def list(self, queue, project=None, marker=None, - limit=10, echo=False, client_uuid=None): + limit=None, echo=False, client_uuid=None): raise NotImplementedError() def post(self, queue, messages, project=None): diff --git a/tests/unit/queues/storage/test_shard_queues.py b/tests/unit/queues/storage/test_shard_queues.py new file mode 100644 index 000000000..78e8c44e4 --- /dev/null +++ b/tests/unit/queues/storage/test_shard_queues.py @@ -0,0 +1,93 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +import uuid + +from oslo.config import cfg + +from marconi.queues.storage import sharding +from marconi.queues.storage import utils +from marconi import tests as testing +from marconi.tests import base + + +class TestShardQueues(base.TestBase): + + @testing.requires_mongodb + def setUp(self): + super(TestShardQueues, self).setUp() + conf = self.load_conf('wsgi_mongodb_sharded.conf') + + conf.register_opts([cfg.StrOpt('storage')], + group='queues:drivers') + + control = utils.load_storage_driver(conf, control_mode=True) + self.shards_ctrl = control.shards_controller + self.controller = sharding.DataDriver(conf, control).queue_controller + + # fake two shards + for _ in xrange(2): + self.shards_ctrl.create(str(uuid.uuid1()), 100, 'sqlite://memory') + + def tearDown(self): + self.shards_ctrl.drop_all() + super(TestShardQueues, self).tearDown() + + def test_listing(self): + project = "I.G" + + interaction = self.controller.list(project=project, + detailed=False) + queues = list(next(interaction)) + + self.assertEqual(len(queues), 0) + + for n in xrange(10): + name = 'queue_%d' % n + self.controller.create(name, project=project) + self.controller.set_metadata(name, + metadata=random.getrandbits(12), + project=project) + + interaction = self.controller.list(project=project, + detailed=True, + limit=7) + queues.extend(next(interaction)) + marker = next(interaction) + + self.assertEqual(len(queues), 7) + + interaction = self.controller.list(project=project, + detailed=True, + limit=7, + marker=marker) + queues.extend(next(interaction)) + + self.assertEqual(len(queues), 10) + + # ordered by name as a whole + self.assertTrue(all(queues[i]['name'] <= queues[i + 1]['name'] + for i in xrange(len(queues) - 1))) + + for n in xrange(10): + self.controller.delete('queue_%d' % n, project=project) + + interaction = self.controller.list(project=project, + detailed=False) + queues = list(next(interaction)) + + self.assertEqual(len(queues), 0)