From c3a57187753e85e308266e570fdac87dc24b6ec0 Mon Sep 17 00:00:00 2001 From: Shaifali Agrawal Date: Tue, 24 Feb 2015 00:22:51 -0800 Subject: [PATCH] Remove QueueController from data to control plane To separate data and control plane of storage layer; Moving queue_controller from data to controll plane. Trying to shift the QueueController object in Control plane. For this new MessageQueueHandler class is also added inside storage/x/messages.py And the newly added handler is added to the entry point. Implements: blueprint split-data-and-control-plane Co-Author: Flavio Percoco Change-Id: I8a167d6ed8e54c98b077b9ea56e68b4e8d5b0291 --- setup.cfg | 7 + tests/etc/wsgi_mongodb_pooled.conf | 2 +- tests/functional/wsgi/v1_1/test_health.py | 20 +- tests/unit/storage/test_impl_mongodb.py | 85 ++--- tests/unit/storage/test_impl_redis.py | 33 +- tests/unit/storage/test_pool_catalog.py | 6 +- tests/unit/storage/test_pool_queues.py | 99 ------ zaqar/bootstrap.py | 6 +- zaqar/storage/base.py | 19 +- zaqar/storage/mongodb/driver.py | 34 +- zaqar/storage/mongodb/messages.py | 61 ++++ zaqar/storage/mongodb/queues.py | 29 +- zaqar/storage/mongodb/subscriptions.py | 3 +- zaqar/storage/pipeline.py | 21 +- zaqar/storage/pooling.py | 11 +- zaqar/storage/redis/driver.py | 12 +- zaqar/storage/redis/messages.py | 61 ++++ zaqar/storage/redis/queues.py | 38 +- zaqar/storage/sqlalchemy/claims.py | 6 +- zaqar/storage/sqlalchemy/messages.py | 59 +++- zaqar/storage/sqlalchemy/queues.py | 42 +-- zaqar/storage/utils.py | 18 +- zaqar/tests/faulty_storage.py | 10 +- zaqar/tests/unit/storage/base.py | 330 +++++++++--------- .../websocket/v1_1/test_queue_lifecycle.py | 2 +- .../unit/transport/wsgi/v1/test_claims.py | 3 +- .../transport/wsgi/v1/test_queue_lifecycle.py | 2 +- .../unit/transport/wsgi/v1_1/test_claims.py | 3 +- .../wsgi/v1_1/test_queue_lifecycle.py | 2 +- .../unit/transport/wsgi/v2_0/test_claims.py | 3 +- .../wsgi/v2_0/test_queue_lifecycle.py | 3 +- 31 files changed, 534 insertions(+), 496 deletions(-) delete mode 100644 tests/unit/storage/test_pool_queues.py diff --git a/setup.cfg b/setup.cfg index bb9eb71f4..d4e9d2fda 100644 --- a/setup.cfg +++ b/setup.cfg @@ -72,6 +72,13 @@ oslo.config.opts = zaqar.storage.stages = zaqar.notification.notifier = zaqar.notification.notifier:NotifierDriver +zaqar.storage.mongodb.driver.queue.stages = + message_queue_handler = zaqar.storage.mongodb.messages:MessageQueueHandler + +zaqar.storage.redis.driver.queue.stages = + message_queue_handler = zaqar.storage.redis.messages:MessageQueueHandler + + [nosetests] where=tests verbosity=2 diff --git a/tests/etc/wsgi_mongodb_pooled.conf b/tests/etc/wsgi_mongodb_pooled.conf index a8f717957..eb67f474b 100644 --- a/tests/etc/wsgi_mongodb_pooled.conf +++ b/tests/etc/wsgi_mongodb_pooled.conf @@ -9,7 +9,7 @@ storage = mongodb [drivers:message_store:mongodb] uri = mongodb://127.0.0.1:27017 -database = zaqar_test +database = zaqar_test_pooled [drivers:management_store:mongodb] uri = mongodb://127.0.0.1:27017 diff --git a/tests/functional/wsgi/v1_1/test_health.py b/tests/functional/wsgi/v1_1/test_health.py index a09a15e44..2db6b8d64 100644 --- a/tests/functional/wsgi/v1_1/test_health.py +++ b/tests/functional/wsgi/v1_1/test_health.py @@ -12,13 +12,11 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -import ddt from zaqar.tests.functional import base from zaqar.tests.functional import helpers -@ddt.ddt class TestHealth(base.V1_1FunctionalTestBase): server_class = base.ZaqarAdminServer @@ -37,22 +35,16 @@ class TestHealth(base.V1_1FunctionalTestBase): self.client.set_base_url(self.base_url) - @ddt.data( - { - 'name': "pool_1", - 'weight': 10, - 'uri': "mongodb://localhost:27017" - } - ) - def test_health_with_pool(self, params): + def test_health_with_pool(self): # FIXME(flwang): Please use mongodb after the sqlalchemy is disabled # as pool node and the mongodb is working on gate successfully. doc = helpers.create_pool_body( - weight=params.get('weight', 10), - uri=params.get('uri', "mongodb://localhost:27017") + weight=10, + uri="mongodb://localhost:27017", + options=dict(database='zaqar_test_pooled_1') ) - pool_name = params.get('name', "pool_1") + pool_name = "pool_1" self.addCleanup(self.client.delete, url='/pools/' + pool_name) result = self.client.put('/pools/' + pool_name, data=doc) @@ -86,7 +78,7 @@ class TestHealth(base.V1_1FunctionalTestBase): self.assertEqual(health[pool_name]['storage_reachable'], True) op_status = health[pool_name]['operation_status'] for op in op_status.keys(): - self.assertTrue(op_status[op]['succeeded']) + self.assertTrue(op_status[op]['succeeded']) message_volume = health[pool_name]['message_volume'] self.assertEqual(message_volume['claimed'], 2) diff --git a/tests/unit/storage/test_impl_mongodb.py b/tests/unit/storage/test_impl_mongodb.py index 05ebd9875..12f55ce82 100644 --- a/tests/unit/storage/test_impl_mongodb.py +++ b/tests/unit/storage/test_impl_mongodb.py @@ -40,18 +40,24 @@ from zaqar.tests.unit.storage import base class MongodbSetupMixin(object): def _purge_databases(self): - databases = (self.driver.message_databases + - [self.driver.queues_database, - self.driver.subscriptions_database]) + if isinstance(self.driver, mongodb.DataDriver): + databases = (self.driver.message_databases + + [self.control.queues_database, + self.driver.subscriptions_database]) + else: + databases = [self.driver.queues_database] for db in databases: self.driver.connection.drop_database(db) def _prepare_conf(self): - self.config(options.MESSAGE_MONGODB_GROUP, - database=uuid.uuid4().hex) - self.config(options.MANAGEMENT_MONGODB_GROUP, - database=uuid.uuid4().hex) + if options.MESSAGE_MONGODB_GROUP in self.conf: + self.config(options.MESSAGE_MONGODB_GROUP, + database=uuid.uuid4().hex) + + if options.MANAGEMENT_MONGODB_GROUP in self.conf: + self.config(options.MANAGEMENT_MONGODB_GROUP, + database=uuid.uuid4().hex) class MongodbUtilsTest(MongodbSetupMixin, testing.TestBase): @@ -69,6 +75,7 @@ class MongodbUtilsTest(MongodbSetupMixin, testing.TestBase): MockDriver = collections.namedtuple('MockDriver', 'mongodb_conf') self.driver = MockDriver(self.mongodb_conf) + self.control_driver = MockDriver(self.mongodb_conf) def test_scope_queue_name(self): self.assertEqual(utils.scope_queue_name('my-q'), '/my-q') @@ -153,14 +160,12 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase): def test_db_instance(self): self.config(unreliable=True) cache = oslo_cache.get_cache() - driver = mongodb.DataDriver(self.conf, cache) + control = mongodb.ControlDriver(self.conf, cache) + data = mongodb.DataDriver(self.conf, cache, control) - databases = (driver.message_databases + - [driver.queues_database]) - - for db in databases: + for db in data.message_databases: self.assertThat(db.name, matchers.StartsWith( - driver.mongodb_conf.database)) + data.mongodb_conf.database)) def test_version_match(self): self.config(unreliable=True) @@ -169,12 +174,14 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase): with mock.patch('pymongo.MongoClient.server_info') as info: info.return_value = {'version': '2.1'} self.assertRaises(RuntimeError, mongodb.DataDriver, - self.conf, cache) + self.conf, cache, + mongodb.ControlDriver(self.conf, cache)) info.return_value = {'version': '2.11'} try: - mongodb.DataDriver(self.conf, cache) + mongodb.DataDriver(self.conf, cache, + mongodb.ControlDriver(self.conf, cache)) except RuntimeError: self.fail('version match failed') @@ -186,21 +193,24 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase): with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos: is_mongos.__get__ = mock.Mock(return_value=False) self.assertRaises(RuntimeError, mongodb.DataDriver, - self.conf, cache) + self.conf, cache, + mongodb.ControlDriver(self.conf, cache)) def test_using_replset(self): cache = oslo_cache.get_cache() with mock.patch('pymongo.MongoClient.nodes') as nodes: nodes.__get__ = mock.Mock(return_value=['node1', 'node2']) - mongodb.DataDriver(self.conf, cache) + mongodb.DataDriver(self.conf, cache, + mongodb.ControlDriver(self.conf, cache)) def test_using_mongos(self): cache = oslo_cache.get_cache() with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos: is_mongos.__get__ = mock.Mock(return_value=True) - mongodb.DataDriver(self.conf, cache) + mongodb.DataDriver(self.conf, cache, + mongodb.ControlDriver(self.conf, cache)) def test_write_concern_check_works(self): cache = oslo_cache.get_cache() @@ -211,17 +221,21 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase): with mock.patch('pymongo.MongoClient.write_concern') as wc: wc.__get__ = mock.Mock(return_value={'w': 1}) self.assertRaises(RuntimeError, mongodb.DataDriver, - self.conf, cache) + self.conf, cache, + mongodb.ControlDriver(self.conf, cache)) wc.__get__ = mock.Mock(return_value={'w': 2}) - mongodb.DataDriver(self.conf, cache) + mongodb.DataDriver(self.conf, cache, + mongodb.ControlDriver(self.conf, cache)) def test_write_concern_is_set(self): cache = oslo_cache.get_cache() with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos: is_mongos.__get__ = mock.Mock(return_value=True) - driver = mongodb.DataDriver(self.conf, cache) + driver = mongodb.DataDriver(self.conf, cache, + mongodb.ControlDriver + (self.conf, cache)) wc = driver.connection.write_concern self.assertEqual(wc['w'], 'majority') self.assertEqual(wc['j'], False) @@ -230,25 +244,16 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase): @testing.requires_mongodb class MongodbQueueTests(MongodbSetupMixin, base.QueueControllerTest): - driver_class = mongodb.DataDriver + driver_class = mongodb.ControlDriver config_file = 'wsgi_mongodb.conf' controller_class = controllers.QueueController + control_driver_class = mongodb.ControlDriver def test_indexes(self): collection = self.controller._collection indexes = collection.index_information() self.assertIn('p_q_1', indexes) - def test_messages_purged(self): - queue_name = 'test' - self.controller.create(queue_name) - self.message_controller.post(queue_name, - [{'ttl': 60}], - 1234) - self.controller.delete(queue_name) - for collection in self.message_controller._collections: - self.assertEqual(collection.find({'q': queue_name}).count(), 0) - def test_raises_connection_error(self): with mock.patch.object(cursor.Cursor, @@ -268,6 +273,7 @@ class MongodbMessageTests(MongodbSetupMixin, base.MessageControllerTest): driver_class = mongodb.DataDriver config_file = 'wsgi_mongodb.conf' controller_class = controllers.MessageController + control_driver_class = mongodb.ControlDriver # NOTE(kgriffs): MongoDB's TTL scavenger only runs once a minute gc_interval = 60 @@ -349,6 +355,7 @@ class MongodbFIFOMessageTests(MongodbSetupMixin, base.MessageControllerTest): driver_class = mongodb.DataDriver config_file = 'wsgi_fifo_mongodb.conf' controller_class = controllers.FIFOMessageController + control_driver_class = mongodb.ControlDriver # NOTE(kgriffs): MongoDB's TTL scavenger only runs once a minute gc_interval = 60 @@ -427,6 +434,7 @@ class MongodbClaimTests(MongodbSetupMixin, base.ClaimControllerTest): driver_class = mongodb.DataDriver config_file = 'wsgi_mongodb.conf' controller_class = controllers.ClaimController + control_driver_class = mongodb.ControlDriver def test_claim_doesnt_exist(self): """Verifies that operations fail on expired/missing claims. @@ -462,6 +470,7 @@ class MongodbSubscriptionTests(MongodbSetupMixin, driver_class = mongodb.DataDriver config_file = 'wsgi_mongodb.conf' controller_class = controllers.SubscriptionController + control_driver_class = mongodb.ControlDriver # @@ -473,6 +482,7 @@ class MongodbPoolsTests(base.PoolsControllerTest): config_file = 'wsgi_mongodb.conf' driver_class = mongodb.ControlDriver controller_class = controllers.PoolsController + control_driver_class = mongodb.ControlDriver def setUp(self): super(MongodbPoolsTests, self).setUp() @@ -500,6 +510,7 @@ class MongodbPoolsTests(base.PoolsControllerTest): class MongodbCatalogueTests(base.CatalogueControllerTest): driver_class = mongodb.ControlDriver controller_class = controllers.CatalogueController + control_driver_class = mongodb.ControlDriver def setUp(self): super(MongodbCatalogueTests, self).setUp() @@ -522,15 +533,6 @@ class PooledMessageTests(base.MessageControllerTest): gc_interval = 60 -@testing.requires_mongodb -class PooledQueueTests(base.QueueControllerTest): - config_file = 'wsgi_mongodb_pooled.conf' - controller_class = pooling.QueueController - driver_class = pooling.DataDriver - control_driver_class = mongodb.ControlDriver - controller_base_class = storage.Queue - - @testing.requires_mongodb class PooledClaimsTests(base.ClaimControllerTest): config_file = 'wsgi_mongodb_pooled.conf' @@ -554,6 +556,7 @@ class PooledClaimsTests(base.ClaimControllerTest): class MongodbFlavorsTest(base.FlavorsControllerTest): driver_class = mongodb.ControlDriver controller_class = controllers.FlavorsController + control_driver_class = mongodb.ControlDriver def setUp(self): super(MongodbFlavorsTest, self).setUp() diff --git a/tests/unit/storage/test_impl_redis.py b/tests/unit/storage/test_impl_redis.py index 9f9129ece..d19eb573d 100644 --- a/tests/unit/storage/test_impl_redis.py +++ b/tests/unit/storage/test_impl_redis.py @@ -24,6 +24,7 @@ import redis from zaqar.common import errors from zaqar.openstack.common.cache import cache as oslo_cache from zaqar import storage +from zaqar.storage import mongodb from zaqar.storage.redis import controllers from zaqar.storage.redis import driver from zaqar.storage.redis import messages @@ -173,7 +174,9 @@ class RedisDriverTest(testing.TestBase): def test_db_instance(self): cache = oslo_cache.get_cache() - redis_driver = driver.DataDriver(self.conf, cache) + redis_driver = driver.DataDriver(self.conf, cache, + driver.ControlDriver + (self.conf, cache)) self.assertTrue(isinstance(redis_driver.connection, redis.StrictRedis)) @@ -183,12 +186,14 @@ class RedisDriverTest(testing.TestBase): with mock.patch('redis.StrictRedis.info') as info: info.return_value = {'redis_version': '2.4.6'} self.assertRaises(RuntimeError, driver.DataDriver, - self.conf, cache) + self.conf, cache, + driver.ControlDriver(self.conf, cache)) info.return_value = {'redis_version': '2.11'} try: - driver.DataDriver(self.conf, cache) + driver.DataDriver(self.conf, cache, + driver.ControlDriver(self.conf, cache)) except RuntimeError: self.fail('version match failed') @@ -281,6 +286,7 @@ class RedisQueuesTest(base.QueueControllerTest): driver_class = driver.DataDriver config_file = 'wsgi_redis.conf' controller_class = controllers.QueueController + control_driver_class = mongodb.ControlDriver def setUp(self): super(RedisQueuesTest, self).setUp() @@ -297,11 +303,11 @@ class RedisMessagesTest(base.MessageControllerTest): driver_class = driver.DataDriver config_file = 'wsgi_redis.conf' controller_class = controllers.MessageController + control_driver_class = mongodb.ControlDriver def setUp(self): super(RedisMessagesTest, self).setUp() self.connection = self.driver.connection - self.queue_ctrl = self.driver.queue_controller def tearDown(self): super(RedisMessagesTest, self).tearDown() @@ -309,7 +315,7 @@ class RedisMessagesTest(base.MessageControllerTest): def test_count(self): queue_name = 'get-count' - self.queue_ctrl.create(queue_name) + self.queue_controller.create(queue_name) msgs = [{ 'ttl': 300, @@ -325,13 +331,13 @@ class RedisMessagesTest(base.MessageControllerTest): def test_empty_queue_exception(self): queue_name = 'empty-queue-test' - self.queue_ctrl.create(queue_name) + self.queue_controller.create(queue_name) self.assertRaises(storage.errors.QueueIsEmpty, self.controller.first, queue_name) def test_gc(self): - self.queue_ctrl.create(self.queue_name) + self.queue_controller.create(self.queue_name) self.controller.post(self.queue_name, [{'ttl': 0, 'body': {}}], client_uuid=str(uuid.uuid4())) @@ -353,12 +359,11 @@ class RedisClaimsTest(base.ClaimControllerTest): driver_class = driver.DataDriver config_file = 'wsgi_redis.conf' controller_class = controllers.ClaimController + control_driver_class = mongodb.ControlDriver def setUp(self): super(RedisClaimsTest, self).setUp() self.connection = self.driver.connection - self.queue_ctrl = self.driver.queue_controller - self.message_ctrl = self.driver.message_controller def tearDown(self): super(RedisClaimsTest, self).tearDown() @@ -367,7 +372,7 @@ class RedisClaimsTest(base.ClaimControllerTest): def test_claim_doesnt_exist(self): queue_name = 'no-such-claim' epoch = '000000000000000000000000' - self.queue_ctrl.create(queue_name) + self.queue_controller.create(queue_name) self.assertRaises(storage.errors.ClaimDoesNotExist, self.controller.get, queue_name, epoch, project=None) @@ -383,12 +388,12 @@ class RedisClaimsTest(base.ClaimControllerTest): claim_id, {}, project=None) def test_gc(self): - self.queue_ctrl.create(self.queue_name) + self.queue_controller.create(self.queue_name) for _ in range(100): - self.message_ctrl.post(self.queue_name, - [{'ttl': 300, 'body': 'yo gabba'}], - client_uuid=str(uuid.uuid4())) + self.message_controller.post(self.queue_name, + [{'ttl': 300, 'body': 'yo gabba'}], + client_uuid=str(uuid.uuid4())) now = timeutils.utcnow_ts() timeutils_utcnow = 'oslo_utils.timeutils.utcnow_ts' diff --git a/tests/unit/storage/test_pool_catalog.py b/tests/unit/storage/test_pool_catalog.py index ec5f97158..55e313b25 100644 --- a/tests/unit/storage/test_pool_catalog.py +++ b/tests/unit/storage/test_pool_catalog.py @@ -65,7 +65,7 @@ class PoolCatalogTest(testing.TestBase): def test_lookup_loads_correct_driver(self): storage = self.catalog.lookup(self.queue, self.project) - self.assertIsInstance(storage, mongodb.DataDriver) + self.assertIsInstance(storage._storage, mongodb.DataDriver) def test_lookup_returns_none_if_queue_not_mapped(self): self.assertIsNone(self.catalog.lookup('not', 'mapped')) @@ -77,14 +77,14 @@ class PoolCatalogTest(testing.TestBase): def test_register_leads_to_successful_lookup(self): self.catalog.register('not_yet', 'mapped') storage = self.catalog.lookup('not_yet', 'mapped') - self.assertIsInstance(storage, mongodb.DataDriver) + self.assertIsInstance(storage._storage, mongodb.DataDriver) def test_register_with_flavor(self): queue = 'test' self.catalog.register(queue, project=self.project, flavor=self.flavor) storage = self.catalog.lookup(queue, self.project) - self.assertIsInstance(storage, mongodb.DataDriver) + self.assertIsInstance(storage._storage, mongodb.DataDriver) def test_register_with_fake_flavor(self): self.assertRaises(errors.FlavorDoesNotExist, diff --git a/tests/unit/storage/test_pool_queues.py b/tests/unit/storage/test_pool_queues.py deleted file mode 100644 index bb47a6bd6..000000000 --- a/tests/unit/storage/test_pool_queues.py +++ /dev/null @@ -1,99 +0,0 @@ -# Copyright (c) 2013 Rackspace, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy -# of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations under -# the License. - -import random -import uuid - -import six - -from zaqar.openstack.common.cache import cache as oslo_cache -from zaqar.storage import pooling -from zaqar.storage import utils -from zaqar import tests as testing - - -@testing.requires_mongodb -class PoolQueuesTest(testing.TestBase): - - config_file = 'wsgi_mongodb_pooled.conf' - - def setUp(self): - super(PoolQueuesTest, self).setUp() - - cache = oslo_cache.get_cache() - control = utils.load_storage_driver(self.conf, cache, - control_mode=True) - self.pools_ctrl = control.pools_controller - self.driver = pooling.DataDriver(self.conf, cache, control) - self.controller = self.driver.queue_controller - - # fake two pools - for i in six.moves.xrange(2): - options = {'database': "zaqar_test_pools_" + str(i)} - self.pools_ctrl.create(str(uuid.uuid1()), 100, - 'mongodb://localhost:27017', - options=options) - - def tearDown(self): - self.pools_ctrl.drop_all() - super(PoolQueuesTest, self).tearDown() - - def test_ping(self): - ping = self.driver.is_alive() - self.assertTrue(ping) - - def test_listing(self): - project = "I.G" - - interaction = self.controller.list(project=project, - detailed=False) - queues = list(next(interaction)) - - self.assertEqual(len(queues), 0) - - for n in six.moves.xrange(10): - name = 'queue_%d' % n - self.controller.create(name, project=project) - self.controller.set_metadata(name, - metadata=random.getrandbits(12), - project=project) - - interaction = self.controller.list(project=project, - detailed=True, - limit=7) - queues.extend(next(interaction)) - marker = next(interaction) - - self.assertEqual(len(queues), 7) - - interaction = self.controller.list(project=project, - detailed=True, - limit=7, - marker=marker) - queues.extend(next(interaction)) - - self.assertEqual(len(queues), 10) - - # ordered by name as a whole - self.assertTrue(all(queues[i]['name'] <= queues[i + 1]['name'] - for i in six.moves.xrange(len(queues) - 1))) - - for n in six.moves.xrange(10): - self.controller.delete('queue_%d' % n, project=project) - - interaction = self.controller.list(project=project, - detailed=False) - queues = list(next(interaction)) - - self.assertEqual(len(queues), 0) diff --git a/zaqar/bootstrap.py b/zaqar/bootstrap.py index aae7934ba..8443c2381 100644 --- a/zaqar/bootstrap.py +++ b/zaqar/bootstrap.py @@ -103,17 +103,17 @@ class Bootstrap(object): @decorators.lazy_property(write=False) def storage(self): LOG.debug(u'Loading storage driver') - if self.conf.pooling: LOG.debug(u'Storage pooling enabled') storage_driver = pooling.DataDriver(self.conf, self.cache, self.control) else: storage_driver = storage_utils.load_storage_driver( - self.conf, self.cache) + self.conf, self.cache, control_driver=self.control) LOG.debug(u'Loading storage pipeline') - return pipeline.DataDriver(self.conf, storage_driver) + return pipeline.DataDriver(self.conf, storage_driver, + self.control) @decorators.lazy_property(write=False) def control(self): diff --git a/zaqar/storage/base.py b/zaqar/storage/base.py index 53641d463..a592c280e 100644 --- a/zaqar/storage/base.py +++ b/zaqar/storage/base.py @@ -25,6 +25,7 @@ import enum from oslo_config import cfg import six +from zaqar.common import decorators import zaqar.openstack.common.log as logging from zaqar.storage import errors from zaqar.storage import utils @@ -95,8 +96,11 @@ class DataDriverBase(DriverBase): BASE_CAPABILITIES = [] - def __init__(self, conf, cache): + def __init__(self, conf, cache, control_driver): super(DataDriverBase, self).__init__(conf, cache) + # creating ControlDriver instance for accessing QueueController's + # data from DataDriver + self.control_driver = control_driver @abc.abstractmethod def is_alive(self): @@ -195,6 +199,9 @@ class DataDriverBase(DriverBase): _handle_status('delete_claim', func) # delete queue + func = functools.partial(self.message_controller.bulk_delete, + queue, msg_ids, project=project) + _handle_status('bulk_delete_messages', func) func = functools.partial(self.queue_controller.delete, queue, project=project) _handle_status('delete_queue', func) @@ -211,10 +218,9 @@ class DataDriverBase(DriverBase): """ pass - @abc.abstractproperty + @decorators.lazy_property(write=False) def queue_controller(self): - """Returns the driver's queue controller.""" - raise NotImplementedError + return self.control_driver.queue_controller @abc.abstractproperty def message_controller(self): @@ -265,6 +271,11 @@ class ControlDriverBase(DriverBase): """Returns storage's flavor management controller.""" raise NotImplementedError + @abc.abstractproperty + def queue_controller(self): + """Returns the driver's queue controller.""" + raise NotImplementedError + class ControllerBase(object): """Top-level class for controllers. diff --git a/zaqar/storage/mongodb/driver.py b/zaqar/storage/mongodb/driver.py index d83b7efa1..c19cb41b1 100644 --- a/zaqar/storage/mongodb/driver.py +++ b/zaqar/storage/mongodb/driver.py @@ -78,8 +78,8 @@ class DataDriver(storage.DataDriverBase): _COL_SUFIX = "_messages_p" - def __init__(self, conf, cache): - super(DataDriver, self).__init__(conf, cache) + def __init__(self, conf, cache, control_driver): + super(DataDriver, self).__init__(conf, cache, control_driver) self.mongodb_conf = self.conf[options.MESSAGE_MONGODB_GROUP] @@ -146,17 +146,6 @@ class DataDriver(storage.DataDriverBase): KPI['message_volume'] = message_volume return KPI - @decorators.lazy_property(write=False) - def queues_database(self): - """Database dedicated to the "queues" collection. - - The queues collection is separated out into its own database - to avoid writer lock contention with the messages collections. - """ - - name = self.mongodb_conf.database + '_queues' - return self.connection[name] - @decorators.lazy_property(write=False) def message_databases(self): """List of message databases, ordered by partition number.""" @@ -185,10 +174,6 @@ class DataDriver(storage.DataDriverBase): """MongoDB client connection instance.""" return _connection(self.mongodb_conf) - @decorators.lazy_property(write=False) - def queue_controller(self): - return controllers.QueueController(self) - @decorators.lazy_property(write=False) def message_controller(self): return controllers.MessageController(self) @@ -236,6 +221,21 @@ class ControlDriver(storage.ControlDriverBase): name = self.mongodb_conf.database return self.connection[name] + @decorators.lazy_property(write=False) + def queues_database(self): + """Database dedicated to the "queues" collection. + + The queues collection is separated out into its own database + to avoid writer lock contention with the messages collections. + """ + + name = self.mongodb_conf.database + '_queues' + return self.connection[name] + + @decorators.lazy_property(write=False) + def queue_controller(self): + return controllers.QueueController(self) + @property def pools_controller(self): return controllers.PoolsController(self) diff --git a/zaqar/storage/mongodb/messages.py b/zaqar/storage/mongodb/messages.py index 3dd3fb071..23904b7fd 100644 --- a/zaqar/storage/mongodb/messages.py +++ b/zaqar/storage/mongodb/messages.py @@ -862,3 +862,64 @@ def _basic_message(msg, now): 'body': msg['b'], 'claim_id': str(msg['c']['id']) if msg['c']['id'] else None } + + +# NOTE(kgriffs): E.g.: 'queuecontroller:exists:5083853/my-queue' +_QUEUE_CACHE_PREFIX = 'queuecontroller:' + +_QUEUE_CACHE_TTL = 5 + + +def _queue_exists_key(queue, project=None): + # NOTE(kgriffs): Use string concatenation for performance, + # also put project first since it is guaranteed to be + # unique, which should reduce lookup time. + return _QUEUE_CACHE_PREFIX + 'exists:' + str(project) + '/' + queue + + +class MessageQueueHandler(object): + + def __init__(self, driver, control_driver): + self.driver = driver + self._cache = self.driver.cache + self.queue_controller = self.driver.queue_controller + self.message_controller = self.driver.message_controller + + def delete(self, queue_name, project=None): + self.message_controller._purge_queue(queue_name, project) + + @utils.raises_conn_error + @utils.retries_on_autoreconnect + def stats(self, name, project=None): + if not self.queue_controller.exists(name, project=project): + raise errors.QueueDoesNotExist(name, project) + + controller = self.message_controller + + active = controller._count(name, project=project, + include_claimed=False) + + total = controller._count(name, project=project, + include_claimed=True) + + message_stats = { + 'claimed': total - active, + 'free': active, + 'total': total, + } + + try: + oldest = controller.first(name, project=project, sort=1) + newest = controller.first(name, project=project, sort=-1) + except errors.QueueIsEmpty: + pass + else: + now = timeutils.utcnow_ts() + message_stats['oldest'] = utils.stat_message(oldest, now) + message_stats['newest'] = utils.stat_message(newest, now) + + return {'messages': message_stats} + + +def _get_scoped_query(name, project): + return {'p_q': utils.scope_queue_name(name, project)} diff --git a/zaqar/storage/mongodb/queues.py b/zaqar/storage/mongodb/queues.py index 69d15444c..d74a29199 100644 --- a/zaqar/storage/mongodb/queues.py +++ b/zaqar/storage/mongodb/queues.py @@ -278,39 +278,12 @@ class QueueController(storage.Queue): @utils.retries_on_autoreconnect @_exists.purges def _delete(self, name, project=None): - self.driver.message_controller._purge_queue(name, project) self._collection.remove(_get_scoped_query(name, project)) @utils.raises_conn_error @utils.retries_on_autoreconnect def _stats(self, name, project=None): - if not self.exists(name, project=project): - raise errors.QueueDoesNotExist(name, project) - - controller = self.driver.message_controller - - active = controller._count(name, project=project, - include_claimed=False) - total = controller._count(name, project=project, - include_claimed=True) - - message_stats = { - 'claimed': total - active, - 'free': active, - 'total': total, - } - - try: - oldest = controller.first(name, project=project, sort=1) - newest = controller.first(name, project=project, sort=-1) - except errors.QueueIsEmpty: - pass - else: - now = timeutils.utcnow_ts() - message_stats['oldest'] = utils.stat_message(oldest, now) - message_stats['newest'] = utils.stat_message(newest, now) - - return {'messages': message_stats} + pass def _get_scoped_query(name, project): diff --git a/zaqar/storage/mongodb/subscriptions.py b/zaqar/storage/mongodb/subscriptions.py index a41a2cab5..ecc2f7014 100644 --- a/zaqar/storage/mongodb/subscriptions.py +++ b/zaqar/storage/mongodb/subscriptions.py @@ -47,7 +47,8 @@ class SubscriptionController(base.Subscription): def __init__(self, *args, **kwargs): super(SubscriptionController, self).__init__(*args, **kwargs) self._collection = self.driver.subscriptions_database.subscriptions - self._queue_collection = self.driver.queues_database.queues + queue_col = self.driver.control_driver.queues_database.queues + self._queue_collection = queue_col self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True) @utils.raises_conn_error diff --git a/zaqar/storage/pipeline.py b/zaqar/storage/pipeline.py index b9ccb4b57..c843be58a 100644 --- a/zaqar/storage/pipeline.py +++ b/zaqar/storage/pipeline.py @@ -85,7 +85,7 @@ def _get_storage_pipeline(resource_name, conf, *args, **kwargs): return pipeline -def _get_builtin_entry_points(resource_name, storage): +def _get_builtin_entry_points(resource_name, storage, control_driver): # Load builtin stages builtin_entry_points = [] @@ -96,7 +96,8 @@ def _get_builtin_entry_points(resource_name, storage): namespace = '%s.%s.stages' % (storage.__module__, resource_name) extensions = extension.ExtensionManager(namespace, invoke_on_load=True, - invoke_args=[storage]) + invoke_args=[storage, + control_driver]) if len(extensions.extensions) == 0: return [] @@ -115,10 +116,10 @@ class DataDriver(base.DataDriverBase): last step in the pipeline """ - def __init__(self, conf, storage): + def __init__(self, conf, storage, control_driver): # NOTE(kgriffs): Pass None for cache since it won't ever # be referenced. - super(DataDriver, self).__init__(conf, None) + super(DataDriver, self).__init__(conf, None, control_driver) self._storage = storage @property @@ -133,14 +134,16 @@ class DataDriver(base.DataDriverBase): @decorators.lazy_property(write=False) def queue_controller(self): - stages = _get_builtin_entry_points('queue', self._storage) + stages = _get_builtin_entry_points('queue', self._storage, + self.control_driver) stages.extend(_get_storage_pipeline('queue', self.conf)) stages.append(self._storage.queue_controller) return common.Pipeline(stages) @decorators.lazy_property(write=False) def message_controller(self): - stages = _get_builtin_entry_points('message', self._storage) + stages = _get_builtin_entry_points('message', self._storage, + self.control_driver) kwargs = {'subscription_controller': self._storage.subscription_controller} stages.extend(_get_storage_pipeline('message', self.conf, **kwargs)) @@ -149,14 +152,16 @@ class DataDriver(base.DataDriverBase): @decorators.lazy_property(write=False) def claim_controller(self): - stages = _get_builtin_entry_points('claim', self._storage) + stages = _get_builtin_entry_points('claim', self._storage, + self.control_driver) stages.extend(_get_storage_pipeline('claim', self.conf)) stages.append(self._storage.claim_controller) return common.Pipeline(stages) @decorators.lazy_property(write=False) def subscription_controller(self): - stages = _get_builtin_entry_points('subscription', self._storage) + stages = _get_builtin_entry_points('subscription', self._storage, + self.control_driver) stages.extend(_get_storage_pipeline('subscription', self.conf)) stages.append(self._storage.subscription_controller) return common.Pipeline(stages) diff --git a/zaqar/storage/pooling.py b/zaqar/storage/pooling.py index d3f336878..deefcd13a 100644 --- a/zaqar/storage/pooling.py +++ b/zaqar/storage/pooling.py @@ -23,6 +23,7 @@ from zaqar.common.storage import select from zaqar.openstack.common import log from zaqar import storage from zaqar.storage import errors +from zaqar.storage import pipeline from zaqar.storage import utils LOG = log.getLogger(__name__) @@ -69,8 +70,8 @@ class DataDriver(storage.DataDriverBase): BASE_CAPABILITIES = tuple(storage.Capabilities) - def __init__(self, conf, cache, control): - super(DataDriver, self).__init__(conf, cache) + def __init__(self, conf, cache, control, control_driver=None): + super(DataDriver, self).__init__(conf, cache, control_driver) self._pool_catalog = Catalog(conf, cache, control) @property @@ -404,6 +405,7 @@ class Catalog(object): self._drivers = {} self._conf = conf self._cache = cache + self.control = control self._conf.register_opts(_CATALOG_OPTIONS, group=_CATALOG_GROUP) self._catalog_conf = self._conf[_CATALOG_GROUP] @@ -424,7 +426,10 @@ class Catalog(object): pool = self._pools_ctrl.get(pool_id, detailed=True) conf = utils.dynamic_conf(pool['uri'], pool['options'], conf=self._conf) - return utils.load_storage_driver(conf, self._cache) + storage = utils.load_storage_driver(conf, + self._cache, + control_driver=self.control) + return pipeline.DataDriver(conf, storage, self.control) @decorators.caches(_pool_cache_key, _POOL_CACHE_TTL) def _pool_id(self, queue, project=None): diff --git a/zaqar/storage/redis/driver.py b/zaqar/storage/redis/driver.py index 3ec92fa87..b2976d655 100644 --- a/zaqar/storage/redis/driver.py +++ b/zaqar/storage/redis/driver.py @@ -149,8 +149,8 @@ class DataDriver(storage.DataDriverBase): _DRIVER_OPTIONS = options._config_options() - def __init__(self, conf, cache): - super(DataDriver, self).__init__(conf, cache) + def __init__(self, conf, cache, control_driver): + super(DataDriver, self).__init__(conf, cache, control_driver) self.redis_conf = self.conf[options.MESSAGE_REDIS_GROUP] server_version = self.connection.info()['redis_version'] @@ -194,10 +194,6 @@ class DataDriver(storage.DataDriverBase): """Redis client connection instance.""" return _get_redis_client(self) - @decorators.lazy_property(write=False) - def queue_controller(self): - return controllers.QueueController(self) - @decorators.lazy_property(write=False) def message_controller(self): return controllers.MessageController(self) @@ -226,6 +222,10 @@ class ControlDriver(storage.ControlDriverBase): """Redis client connection instance.""" return _get_redis_client(self) + @decorators.lazy_property(write=False) + def queue_controller(self): + return controllers.QueueController(self) + @property def pools_controller(self): raise NotImplementedError() diff --git a/zaqar/storage/redis/messages.py b/zaqar/storage/redis/messages.py index 094fa6816..97776a8e9 100644 --- a/zaqar/storage/redis/messages.py +++ b/zaqar/storage/redis/messages.py @@ -17,6 +17,7 @@ import uuid from oslo_utils import encodeutils from oslo_utils import timeutils +import redis from zaqar.common import decorators from zaqar import storage @@ -558,3 +559,63 @@ def _filter_messages(messages, filters, to_basic, marker): yield msg.to_basic(now) else: yield msg + +QUEUES_SET_STORE_NAME = 'queues_set' + + +class MessageQueueHandler(object): + def __init__(self, driver, control_driver): + self.driver = driver + self._client = self.driver.connection + self._queue_ctrl = self.driver.queue_controller + self._message_ctrl = self.driver.message_controller + self._claim_ctrl = self.driver.claim_controller + + @utils.raises_conn_error + def create(self, name, metadata=None, project=None): + with self._client.pipeline() as pipe: + self._message_ctrl._create_msgset(name, project, pipe) + + try: + pipe.execute() + except redis.exceptions.ResponseError: + return False + + @utils.raises_conn_error + @utils.retries_on_connection_error + def delete(self, name, project=None): + with self._client.pipeline() as pipe: + self._message_ctrl._delete_msgset(name, project, pipe) + self._message_ctrl._delete_queue_messages(name, project, pipe) + pipe.execute() + + @utils.raises_conn_error + @utils.retries_on_connection_error + def stats(self, name, project=None): + if not self._queue_ctrl.exists(name, project=project): + raise errors.QueueDoesNotExist(name, project) + + total = self._message_ctrl._count(name, project) + + if total: + claimed = self._claim_ctrl._count_messages(name, project) + else: + claimed = 0 + + message_stats = { + 'claimed': claimed, + 'free': total - claimed, + 'total': total, + } + + if total: + try: + newest = self._message_ctrl.first(name, project, -1) + oldest = self._message_ctrl.first(name, project, 1) + except errors.QueueIsEmpty: + pass + else: + message_stats['newest'] = newest + message_stats['oldest'] = oldest + + return {'messages': message_stats} diff --git a/zaqar/storage/redis/queues.py b/zaqar/storage/redis/queues.py index 9a11dc61d..28ac20cf9 100644 --- a/zaqar/storage/redis/queues.py +++ b/zaqar/storage/redis/queues.py @@ -70,10 +70,6 @@ class QueueController(storage.Queue): use_bin_type=True).pack self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8') - @decorators.lazy_property(write=False) - def _message_ctrl(self): - return self.driver.message_controller - @decorators.lazy_property(write=False) def _claim_ctrl(self): return self.driver.claim_controller @@ -124,7 +120,7 @@ class QueueController(storage.Queue): qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project) # Check if the queue already exists. - if self.exists(name, project): + if self._exists(name, project): return False queue = { @@ -137,7 +133,6 @@ class QueueController(storage.Queue): # Pipeline ensures atomic inserts. with self._client.pipeline() as pipe: pipe.zadd(qset_key, 1, queue_key).hmset(queue_key, queue) - self._message_ctrl._create_msgset(name, project, pipe) try: pipe.execute() @@ -187,38 +182,9 @@ class QueueController(storage.Queue): with self._client.pipeline() as pipe: pipe.zrem(qset_key, queue_key) pipe.delete(queue_key) - self._message_ctrl._delete_msgset(name, project, pipe) - self._message_ctrl._delete_queue_messages(name, project, pipe) - pipe.execute() @utils.raises_conn_error @utils.retries_on_connection_error def _stats(self, name, project=None): - if not self.exists(name, project=project): - raise errors.QueueDoesNotExist(name, project) - - total = self._message_ctrl._count(name, project) - - if total: - claimed = self._claim_ctrl._count_messages(name, project) - else: - claimed = 0 - - message_stats = { - 'claimed': claimed, - 'free': total - claimed, - 'total': total, - } - - if total: - try: - newest = self._message_ctrl.first(name, project, -1) - oldest = self._message_ctrl.first(name, project, 1) - except errors.QueueIsEmpty: - pass - else: - message_stats['newest'] = newest - message_stats['oldest'] = oldest - - return {'messages': message_stats} + pass diff --git a/zaqar/storage/sqlalchemy/claims.py b/zaqar/storage/sqlalchemy/claims.py index a2c08e7c8..7400fecd6 100644 --- a/zaqar/storage/sqlalchemy/claims.py +++ b/zaqar/storage/sqlalchemy/claims.py @@ -88,7 +88,7 @@ class ClaimController(storage.Claim): with self.driver.trans() as trans: try: - qid = utils.get_qid(self.driver, queue, project) + qid = utils.get_qid(self.driver.control_driver, queue, project) except errors.QueueDoesNotExist: return None, iter([]) @@ -136,7 +136,7 @@ class ClaimController(storage.Claim): age = utils.get_age(tables.Claims.c.created) with self.driver.trans() as trans: - qid = utils.get_qid(self.driver, queue, project) + qid = utils.get_qid(self.driver.control_driver, queue, project) update = tables.Claims.update().where(sa.and_( tables.Claims.c.ttl > age, @@ -168,7 +168,7 @@ class ClaimController(storage.Claim): try: # NOTE(flaper87): This could probably use some # joins and be just 1 query. - qid = utils.get_qid(self.driver, queue, project) + qid = utils.get_qid(self.driver.control_driver, queue, project) except errors.QueueDoesNotExist: return diff --git a/zaqar/storage/sqlalchemy/messages.py b/zaqar/storage/sqlalchemy/messages.py index 7cf7adcc8..21fe38f6a 100644 --- a/zaqar/storage/sqlalchemy/messages.py +++ b/zaqar/storage/sqlalchemy/messages.py @@ -133,7 +133,8 @@ class MessageController(storage.Message): if project is None: project = '' - qid = utils.get_qid(self.driver, queue, project) + qid = utils.get_qid(self.driver.control_driver, + queue, project) sel = sa.sql.select([tables.Messages.c.id, tables.Messages.c.body, @@ -230,7 +231,8 @@ class MessageController(storage.Message): project = '' with self.driver.trans() as trans: - qid = utils.get_qid(self.driver, queue, project) + qid = utils.get_qid(self.driver.control_driver, + queue, project) # Delete the expired messages and_stmt = sa.and_(tables.Messages.c.ttl <= @@ -310,7 +312,8 @@ class MessageController(storage.Message): with self.driver.trans() as trans: try: - qid = utils.get_qid(self.driver, queue, project) + qid = utils.get_qid(self.driver.control_driver, + queue, project) except errors.QueueDoesNotExist: return @@ -359,7 +362,8 @@ class MessageController(storage.Message): statement = tables.Messages.delete() - qid = utils.get_qid(self.driver, queue_name, project) + qid = utils.get_qid(self.driver.control_driver, + queue_name, project) and_stmt = [tables.Messages.c.id.in_(message_ids), tables.Messages.c.qid == qid] @@ -367,3 +371,50 @@ class MessageController(storage.Message): trans.execute(statement.where(sa.and_(*and_stmt))) return messages + + +class MessageQueueHandler(object): + def __init__(self, driver, control_driver): + self.driver = driver + + def stats(self, name, project): + if project is None: + project = '' + + qid = utils.get_qid(self.driver.control_driver, name, project) + sel = sa.sql.select([ + sa.sql.select([sa.func.count(tables.Messages.c.id)], + sa.and_( + tables.Messages.c.qid == qid, + tables.Messages.c.cid != (None), + tables.Messages.c.ttl > + sfunc.now() - tables.Messages.c.created)), + sa.sql.select([sa.func.count(tables.Messages.c.id)], + sa.and_( + tables.Messages.c.qid == qid, + tables.Messages.c.cid == (None), + tables.Messages.c.ttl > + sfunc.now() - tables.Messages.c.created)) + ]) + + claimed, free = self.driver.get(sel) + + total = free + claimed + + message_stats = { + 'claimed': claimed, + 'free': free, + 'total': total, + } + + try: + message_controller = self.driver.message_controller + oldest = message_controller.first(name, project, sort=1) + newest = message_controller.first(name, project, sort=-1) + except errors.QueueIsEmpty: + pass + else: + message_stats['oldest'] = utils.stat_message(oldest) + message_stats['newest'] = utils.stat_message(newest) + + return {'messages': message_stats} diff --git a/zaqar/storage/sqlalchemy/queues.py b/zaqar/storage/sqlalchemy/queues.py index 18bcd9346..0125c4d8b 100644 --- a/zaqar/storage/sqlalchemy/queues.py +++ b/zaqar/storage/sqlalchemy/queues.py @@ -13,7 +13,6 @@ # the License. import sqlalchemy as sa -from sqlalchemy.sql import func as sfunc from zaqar import storage from zaqar.storage import errors @@ -130,43 +129,4 @@ class QueueController(storage.Queue): self.driver.run(dlt) def _stats(self, name, project): - if project is None: - project = '' - - qid = utils.get_qid(self.driver, name, project) - sel = sa.sql.select([ - sa.sql.select([sa.func.count(tables.Messages.c.id)], - sa.and_( - tables.Messages.c.qid == qid, - tables.Messages.c.cid != (None), - tables.Messages.c.ttl > - sfunc.now() - tables.Messages.c.created)), - sa.sql.select([sa.func.count(tables.Messages.c.id)], - sa.and_( - tables.Messages.c.qid == qid, - tables.Messages.c.cid == (None), - tables.Messages.c.ttl > - sfunc.now() - tables.Messages.c.created)) - ]) - - claimed, free = self.driver.get(sel) - - total = free + claimed - - message_stats = { - 'claimed': claimed, - 'free': free, - 'total': total, - } - - try: - message_controller = self.driver.message_controller - oldest = message_controller.first(name, project, sort=1) - newest = message_controller.first(name, project, sort=-1) - except errors.QueueIsEmpty: - pass - else: - message_stats['oldest'] = utils.stat_message(oldest) - message_stats['newest'] = utils.stat_message(newest) - - return {'messages': message_stats} + pass diff --git a/zaqar/storage/utils.py b/zaqar/storage/utils.py index 84ad0ee2a..2271d4f8b 100644 --- a/zaqar/storage/utils.py +++ b/zaqar/storage/utils.py @@ -95,7 +95,8 @@ def load_storage_impl(uri, control_mode=False, default_store=None): raise errors.InvalidDriver(exc) -def load_storage_driver(conf, cache, storage_type=None, control_mode=False): +def load_storage_driver(conf, cache, storage_type=None, + control_mode=False, control_driver=None): """Loads a storage driver and returns it. The driver's initializer will be passed conf and cache as @@ -110,17 +111,24 @@ def load_storage_driver(conf, cache, storage_type=None, control_mode=False): :param control_mode: (Default False). Determines which driver type to load; if False, the data driver is loaded. If True, the control driver is loaded. + :param control_driver: (Default None). The control driver + instance to pass to the storage driver. Needed to access + the queue controller, mainly. """ mode = 'control' if control_mode else 'data' driver_type = 'zaqar.{0}.storage'.format(mode) storage_type = storage_type or conf['drivers'].storage + _invoke_args = [conf, cache] + if control_driver is not None: + _invoke_args.append(control_driver) + try: mgr = driver.DriverManager(driver_type, storage_type, invoke_on_load=True, - invoke_args=[conf, cache]) + invoke_args=_invoke_args) return mgr.driver @@ -178,7 +186,11 @@ def can_connect(uri, conf=None): # the URI field. This should be sufficient to initialize a # storage driver. driver = load_storage_driver(conf, None, - storage_type=storage_type) + storage_type=storage_type, + control_driver=load_storage_driver + (conf, None, + storage_type=storage_type, + control_mode=True)) return driver.is_alive() except Exception as exc: LOG.debug('Can\'t connect to: %s \n%s' % (uri, exc)) diff --git a/zaqar/tests/faulty_storage.py b/zaqar/tests/faulty_storage.py index a4a98ed33..c823dbaa1 100644 --- a/zaqar/tests/faulty_storage.py +++ b/zaqar/tests/faulty_storage.py @@ -17,8 +17,8 @@ from zaqar import storage class DataDriver(storage.DataDriverBase): - def __init__(self, conf, cache): - super(DataDriver, self).__init__(conf, cache) + def __init__(self, conf, cache, control_driver): + super(DataDriver, self).__init__(conf, cache, control_driver) @property def default_options(self): @@ -36,7 +36,7 @@ class DataDriver(storage.DataDriverBase): @property def queue_controller(self): - return QueueController(self) + return self.control_driver.queue_controller @property def message_controller(self): @@ -56,6 +56,10 @@ class ControlDriver(storage.ControlDriverBase): def __init__(self, conf, cache): super(ControlDriver, self).__init__(conf, cache) + @property + def queue_controller(self): + return QueueController(self) + @property def catalogue_controller(self): return None diff --git a/zaqar/tests/unit/storage/base.py b/zaqar/tests/unit/storage/base.py index b71032eca..ee4bee102 100644 --- a/zaqar/tests/unit/storage/base.py +++ b/zaqar/tests/unit/storage/base.py @@ -28,6 +28,7 @@ from testtools import matchers from zaqar.openstack.common.cache import cache as oslo_cache from zaqar import storage from zaqar.storage import errors +from zaqar.storage import pipeline from zaqar import tests as testing from zaqar.tests import helpers @@ -58,18 +59,21 @@ class ControllerBaseTest(testing.TestBase): self.skipTest("Pooling is enabled, " "but control driver class is not specified") + self.control = self.control_driver_class(self.conf, cache) if not pooling: - self.driver = self.driver_class(self.conf, cache) + args = [self.conf, cache] + if issubclass(self.driver_class, storage.DataDriverBase): + args.append(self.control) + self.driver = self.driver_class(*args) else: - control = self.control_driver_class(self.conf, cache) uri = "mongodb://localhost:27017" for i in range(4): options = {'database': "zaqar_test_pools_" + str(i)} - control.pools_controller.create(six.text_type(i), - 100, uri, options=options) - self.driver = self.driver_class(self.conf, cache, control) - self.addCleanup(control.pools_controller.drop_all) - self.addCleanup(control.catalogue_controller.drop_all) + self.control.pools_controller.create(six.text_type(i), + 100, uri, options=options) + self.driver = self.driver_class(self.conf, cache, self.control) + self.addCleanup(self.control.pools_controller.drop_all) + self.addCleanup(self.control.catalogue_controller.drop_all) self._prepare_conf() @@ -80,6 +84,10 @@ class ControllerBaseTest(testing.TestBase): else: self.controller = self.controller_class(self.driver._pool_catalog) + self.pipeline = pipeline.DataDriver(self.conf, + self.driver, + self.control) + def _prepare_conf(self): """Prepare the conf before running tests @@ -98,9 +106,7 @@ class QueueControllerTest(ControllerBaseTest): def setUp(self): super(QueueControllerTest, self).setUp() - self.queue_controller = self.driver.queue_controller - self.message_controller = self.driver.message_controller - self.claim_controller = self.driver.claim_controller + self.queue_controller = self.pipeline.queue_controller @ddt.data(None, ControllerBaseTest.project) def test_list(self, project): @@ -164,10 +170,144 @@ class QueueControllerTest(ControllerBaseTest): metadata = self.controller.get('test', project=self.project) self.assertEqual(metadata['meta'], 'test_meta') + # Test queue deletion + self.controller.delete('test', project=self.project) + + # Test queue existence + self.assertFalse(self.controller.exists('test', project=self.project)) + + +class MessageControllerTest(ControllerBaseTest): + """Message Controller base tests. + + NOTE(flaper87): Implementations of this class should + override the tearDown method in order + to clean up storage's state. + """ + queue_name = 'test_queue' + controller_base_class = storage.Message + + # Specifies how often expired messages are purged, in sec. + gc_interval = 0 + + def setUp(self): + super(MessageControllerTest, self).setUp() + + # Lets create a queue + self.queue_controller = self.pipeline.queue_controller + self.claim_controller = self.pipeline.claim_controller + self.queue_controller.create(self.queue_name, project=self.project) + + def tearDown(self): + self.queue_controller.delete(self.queue_name, project=self.project) + super(MessageControllerTest, self).tearDown() + + def test_stats_for_empty_queue(self): + self.addCleanup(self.queue_controller.delete, 'test', + project=self.project) + created = self.queue_controller.create('test', project=self.project) + self.assertTrue(created) + + stats = self.queue_controller.stats('test', project=self.project) + message_stats = stats['messages'] + + self.assertEqual(message_stats['free'], 0) + self.assertEqual(message_stats['claimed'], 0) + self.assertEqual(message_stats['total'], 0) + + self.assertNotIn('newest', message_stats) + self.assertNotIn('oldest', message_stats) + + def test_queue_count_on_bulk_delete(self): + self.addCleanup(self.queue_controller.delete, 'test-queue', + project=self.project) + queue_name = 'test-queue' client_uuid = uuid.uuid4() + created = self.queue_controller.create(queue_name, + project=self.project) + self.assertTrue(created) + + # Create 10 messages. + msg_keys = _insert_fixtures(self.controller, queue_name, + project=self.project, + client_uuid=client_uuid, num=10) + + stats = self.queue_controller.stats(queue_name, + self.project)['messages'] + self.assertEqual(stats['total'], 10) + + # Delete 5 messages + self.controller.bulk_delete(queue_name, msg_keys[0:5], + self.project) + stats = self.queue_controller.stats(queue_name, + self.project)['messages'] + self.assertEqual(stats['total'], 5) + + def test_queue_count_on_bulk_delete_with_invalid_id(self): + self.addCleanup(self.queue_controller.delete, 'test-queue', + project=self.project) + queue_name = 'test-queue' + client_uuid = uuid.uuid4() + + created = self.queue_controller.create(queue_name, + project=self.project) + self.assertTrue(created) + + # Create 10 messages. + msg_keys = _insert_fixtures(self.controller, queue_name, + project=self.project, + client_uuid=client_uuid, num=10) + + stats = self.queue_controller.stats(queue_name, + self.project)['messages'] + self.assertEqual(stats['total'], 10) + + # Delete 5 messages + self.controller.bulk_delete(queue_name, + msg_keys[0:5] + ['invalid'], + self.project) + + stats = self.queue_controller.stats(queue_name, + self.project)['messages'] + self.assertEqual(stats['total'], 5) + + def test_queue_count_on_delete(self): + self.addCleanup(self.queue_controller.delete, 'test-queue', + project=self.project) + queue_name = 'test-queue' + client_uuid = uuid.uuid4() + + created = self.queue_controller.create(queue_name, + project=self.project) + self.assertTrue(created) + + # Create 10 messages. + msg_keys = _insert_fixtures(self.controller, queue_name, + project=self.project, + client_uuid=client_uuid, num=10) + + stats = self.queue_controller.stats(queue_name, + self.project)['messages'] + self.assertEqual(stats['total'], 10) + + # Delete 1 message + self.controller.delete(queue_name, msg_keys[0], self.project) + stats = self.queue_controller.stats(queue_name, + self.project)['messages'] + self.assertEqual(stats['total'], 9) + + def test_queue_stats(self): + # Test queue creation + self.addCleanup(self.queue_controller.delete, 'test', + project=self.project) + created = self.queue_controller.create('test', + metadata=dict(meta='test_meta'), + project=self.project) + + client_uuid = uuid.uuid4() # Test queue statistic - _insert_fixtures(self.message_controller, 'test', + _insert_fixtures(self.controller, 'test', project=self.project, client_uuid=client_uuid, num=6) @@ -176,11 +316,11 @@ class QueueControllerTest(ControllerBaseTest): # message timestamps (and may not be monkey-patchable). time.sleep(1.2) - _insert_fixtures(self.message_controller, 'test', + _insert_fixtures(self.controller, 'test', project=self.project, client_uuid=client_uuid, num=6) - stats = self.controller.stats('test', project=self.project) + stats = self.queue_controller.stats('test', project=self.project) message_stats = stats['messages'] self.assertEqual(message_stats['free'], 12) @@ -210,121 +350,23 @@ class QueueControllerTest(ControllerBaseTest): self.assertThat(oldest['created'], matchers.LessThan(newest['created'])) - # Test queue deletion - self.controller.delete('test', project=self.project) - - # Test queue existence - self.assertFalse(self.controller.exists('test', project=self.project)) - - def test_stats_for_empty_queue(self): - self.addCleanup(self.controller.delete, 'test', project=self.project) - created = self.controller.create('test', project=self.project) - self.assertTrue(created) - - stats = self.controller.stats('test', project=self.project) - message_stats = stats['messages'] - - self.assertEqual(message_stats['free'], 0) - self.assertEqual(message_stats['claimed'], 0) - self.assertEqual(message_stats['total'], 0) - - self.assertNotIn('newest', message_stats) - self.assertNotIn('oldest', message_stats) - - def test_queue_count_on_bulk_delete(self): - self.addCleanup(self.controller.delete, 'test-queue', - project=self.project) - queue_name = 'test-queue' - client_uuid = uuid.uuid4() - - created = self.controller.create(queue_name, project=self.project) - self.assertTrue(created) - - # Create 10 messages. - msg_keys = _insert_fixtures(self.message_controller, queue_name, - project=self.project, - client_uuid=client_uuid, num=10) - - stats = self.controller.stats(queue_name, - self.project)['messages'] - self.assertEqual(stats['total'], 10) - - # Delete 5 messages - self.message_controller.bulk_delete(queue_name, msg_keys[0:5], - self.project) - - stats = self.controller.stats(queue_name, - self.project)['messages'] - self.assertEqual(stats['total'], 5) - - def test_queue_count_on_bulk_delete_with_invalid_id(self): - self.addCleanup(self.controller.delete, 'test-queue', - project=self.project) - queue_name = 'test-queue' - client_uuid = uuid.uuid4() - - created = self.controller.create(queue_name, project=self.project) - self.assertTrue(created) - - # Create 10 messages. - msg_keys = _insert_fixtures(self.message_controller, queue_name, - project=self.project, - client_uuid=client_uuid, num=10) - - stats = self.controller.stats(queue_name, - self.project)['messages'] - self.assertEqual(stats['total'], 10) - - # Delete 5 messages - self.message_controller.bulk_delete(queue_name, - msg_keys[0:5] + ['invalid'], - self.project) - - stats = self.controller.stats(queue_name, - self.project)['messages'] - self.assertEqual(stats['total'], 5) - - def test_queue_count_on_delete(self): - self.addCleanup(self.controller.delete, 'test-queue', - project=self.project) - queue_name = 'test-queue' - client_uuid = uuid.uuid4() - - created = self.controller.create(queue_name, project=self.project) - self.assertTrue(created) - - # Create 10 messages. - msg_keys = _insert_fixtures(self.message_controller, queue_name, - project=self.project, - client_uuid=client_uuid, num=10) - - stats = self.controller.stats(queue_name, - self.project)['messages'] - self.assertEqual(stats['total'], 10) - - # Delete 1 message - self.message_controller.delete(queue_name, msg_keys[0], - self.project) - stats = self.controller.stats(queue_name, - self.project)['messages'] - self.assertEqual(stats['total'], 9) - def test_queue_count_on_claim_delete(self): - self.addCleanup(self.controller.delete, 'test-queue', + self.addCleanup(self.queue_controller.delete, 'test-queue', project=self.project) queue_name = 'test-queue' client_uuid = uuid.uuid4() - created = self.controller.create(queue_name, project=self.project) + created = self.queue_controller.create(queue_name, + project=self.project) self.assertTrue(created) # Create 15 messages. - msg_keys = _insert_fixtures(self.message_controller, queue_name, + msg_keys = _insert_fixtures(self.controller, queue_name, project=self.project, client_uuid=client_uuid, num=15) - stats = self.controller.stats(queue_name, - self.project)['messages'] + stats = self.queue_controller.stats(queue_name, + self.project)['messages'] self.assertEqual(stats['total'], 15) metadata = {'ttl': 120, 'grace': 60} @@ -332,25 +374,25 @@ class QueueControllerTest(ControllerBaseTest): claim_id, _ = self.claim_controller.create(queue_name, metadata, self.project) - stats = self.controller.stats(queue_name, - self.project)['messages'] + stats = self.queue_controller.stats(queue_name, + self.project)['messages'] self.assertEqual(stats['claimed'], 10) # Delete one message and ensure stats are updated even # thought the claim itself has not been deleted. - self.message_controller.delete(queue_name, msg_keys[0], - self.project, claim_id) - stats = self.controller.stats(queue_name, - self.project)['messages'] + self.controller.delete(queue_name, msg_keys[0], + self.project, claim_id) + stats = self.queue_controller.stats(queue_name, + self.project)['messages'] self.assertEqual(stats['total'], 14) self.assertEqual(stats['claimed'], 9) self.assertEqual(stats['free'], 5) # Same thing but use bulk_delete interface - self.message_controller.bulk_delete(queue_name, msg_keys[1:3], - self.project) - stats = self.controller.stats(queue_name, - self.project)['messages'] + self.controller.bulk_delete(queue_name, msg_keys[1:3], + self.project) + stats = self.queue_controller.stats(queue_name, + self.project)['messages'] self.assertEqual(stats['total'], 12) self.assertEqual(stats['claimed'], 7) self.assertEqual(stats['free'], 5) @@ -358,37 +400,11 @@ class QueueControllerTest(ControllerBaseTest): # Delete the claim self.claim_controller.delete(queue_name, claim_id, self.project) - stats = self.controller.stats(queue_name, - self.project)['messages'] + stats = self.queue_controller.stats(queue_name, + self.project)['messages'] self.assertEqual(stats['claimed'], 0) - -class MessageControllerTest(ControllerBaseTest): - """Message Controller base tests. - - NOTE(flaper87): Implementations of this class should - override the tearDown method in order - to clean up storage's state. - """ - queue_name = 'test_queue' - controller_base_class = storage.Message - - # Specifies how often expired messages are purged, in sec. - gc_interval = 0 - - def setUp(self): - super(MessageControllerTest, self).setUp() - - # Lets create a queue - self.queue_controller = self.driver.queue_controller - self.claim_controller = self.driver.claim_controller - self.queue_controller.create(self.queue_name, project=self.project) - - def tearDown(self): - self.queue_controller.delete(self.queue_name, project=self.project) - super(MessageControllerTest, self).tearDown() - def test_message_lifecycle(self): queue_name = self.queue_name @@ -729,8 +745,8 @@ class ClaimControllerTest(ControllerBaseTest): super(ClaimControllerTest, self).setUp() # Lets create a queue - self.queue_controller = self.driver.queue_controller - self.message_controller = self.driver.message_controller + self.queue_controller = self.pipeline.queue_controller + self.message_controller = self.pipeline.message_controller self.queue_controller.create(self.queue_name, project=self.project) def tearDown(self): diff --git a/zaqar/tests/unit/transport/websocket/v1_1/test_queue_lifecycle.py b/zaqar/tests/unit/transport/websocket/v1_1/test_queue_lifecycle.py index 8dafa98d3..ee113e9bb 100644 --- a/zaqar/tests/unit/transport/websocket/v1_1/test_queue_lifecycle.py +++ b/zaqar/tests/unit/transport/websocket/v1_1/test_queue_lifecycle.py @@ -579,7 +579,7 @@ class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest): storage = self.boot.storage._storage connection = storage.connection - connection.drop_database(storage.queues_database) + connection.drop_database(self.boot.control.queues_database) for db in storage.message_databases: connection.drop_database(db) diff --git a/zaqar/tests/unit/transport/wsgi/v1/test_claims.py b/zaqar/tests/unit/transport/wsgi/v1/test_claims.py index d3cf87a2a..f0210d853 100644 --- a/zaqar/tests/unit/transport/wsgi/v1/test_claims.py +++ b/zaqar/tests/unit/transport/wsgi/v1/test_claims.py @@ -233,9 +233,10 @@ class TestClaimsMongoDB(ClaimsBaseTest): def tearDown(self): storage = self.boot.storage._storage + control = self.boot.control connection = storage.connection - connection.drop_database(storage.queues_database) + connection.drop_database(control.queues_database) for db in storage.message_databases: connection.drop_database(db) diff --git a/zaqar/tests/unit/transport/wsgi/v1/test_queue_lifecycle.py b/zaqar/tests/unit/transport/wsgi/v1/test_queue_lifecycle.py index 39ca4c9bc..444b39619 100644 --- a/zaqar/tests/unit/transport/wsgi/v1/test_queue_lifecycle.py +++ b/zaqar/tests/unit/transport/wsgi/v1/test_queue_lifecycle.py @@ -348,7 +348,7 @@ class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest): storage = self.boot.storage._storage connection = storage.connection - connection.drop_database(storage.queues_database) + connection.drop_database(self.boot.control.queues_database) for db in storage.message_databases: connection.drop_database(db) diff --git a/zaqar/tests/unit/transport/wsgi/v1_1/test_claims.py b/zaqar/tests/unit/transport/wsgi/v1_1/test_claims.py index 43fa736ad..18b0e9c04 100644 --- a/zaqar/tests/unit/transport/wsgi/v1_1/test_claims.py +++ b/zaqar/tests/unit/transport/wsgi/v1_1/test_claims.py @@ -286,9 +286,10 @@ class TestClaimsMongoDB(ClaimsBaseTest): def tearDown(self): storage = self.boot.storage._storage + control = self.boot.control connection = storage.connection - connection.drop_database(storage.queues_database) + connection.drop_database(control.queues_database) for db in storage.message_databases: connection.drop_database(db) diff --git a/zaqar/tests/unit/transport/wsgi/v1_1/test_queue_lifecycle.py b/zaqar/tests/unit/transport/wsgi/v1_1/test_queue_lifecycle.py index 7cd4ac9f9..5cbb8304a 100644 --- a/zaqar/tests/unit/transport/wsgi/v1_1/test_queue_lifecycle.py +++ b/zaqar/tests/unit/transport/wsgi/v1_1/test_queue_lifecycle.py @@ -329,7 +329,7 @@ class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest): storage = self.boot.storage._storage connection = storage.connection - connection.drop_database(storage.queues_database) + connection.drop_database(self.boot.control.queues_database) for db in storage.message_databases: connection.drop_database(db) diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_claims.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_claims.py index dea5484ba..320553dc4 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/test_claims.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_claims.py @@ -286,9 +286,10 @@ class TestClaimsMongoDB(ClaimsBaseTest): def tearDown(self): storage = self.boot.storage._storage + control = self.boot.control connection = storage.connection - connection.drop_database(storage.queues_database) + connection.drop_database(control.queues_database) for db in storage.message_databases: connection.drop_database(db) diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_queue_lifecycle.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_queue_lifecycle.py index 487f6cc7f..618776553 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/test_queue_lifecycle.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_queue_lifecycle.py @@ -326,10 +326,11 @@ class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest): super(TestQueueLifecycleMongoDB, self).setUp() def tearDown(self): + control = self.boot.control storage = self.boot.storage._storage connection = storage.connection - connection.drop_database(storage.queues_database) + connection.drop_database(control.queues_database) for db in storage.message_databases: connection.drop_database(db)