Merge "Remove QueueController from data to control plane"

This commit is contained in:
Jenkins 2015-04-02 22:12:16 +00:00 committed by Gerrit Code Review
commit 2cac45f1ce
31 changed files with 534 additions and 496 deletions

View File

@ -72,6 +72,13 @@ oslo.config.opts =
zaqar.storage.stages = zaqar.storage.stages =
zaqar.notification.notifier = zaqar.notification.notifier:NotifierDriver zaqar.notification.notifier = zaqar.notification.notifier:NotifierDriver
zaqar.storage.mongodb.driver.queue.stages =
message_queue_handler = zaqar.storage.mongodb.messages:MessageQueueHandler
zaqar.storage.redis.driver.queue.stages =
message_queue_handler = zaqar.storage.redis.messages:MessageQueueHandler
[nosetests] [nosetests]
where=tests where=tests
verbosity=2 verbosity=2

View File

@ -9,7 +9,7 @@ storage = mongodb
[drivers:message_store:mongodb] [drivers:message_store:mongodb]
uri = mongodb://127.0.0.1:27017 uri = mongodb://127.0.0.1:27017
database = zaqar_test database = zaqar_test_pooled
[drivers:management_store:mongodb] [drivers:management_store:mongodb]
uri = mongodb://127.0.0.1:27017 uri = mongodb://127.0.0.1:27017

View File

@ -12,13 +12,11 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import ddt
from zaqar.tests.functional import base from zaqar.tests.functional import base
from zaqar.tests.functional import helpers from zaqar.tests.functional import helpers
@ddt.ddt
class TestHealth(base.V1_1FunctionalTestBase): class TestHealth(base.V1_1FunctionalTestBase):
server_class = base.ZaqarAdminServer server_class = base.ZaqarAdminServer
@ -37,22 +35,16 @@ class TestHealth(base.V1_1FunctionalTestBase):
self.client.set_base_url(self.base_url) self.client.set_base_url(self.base_url)
@ddt.data( def test_health_with_pool(self):
{
'name': "pool_1",
'weight': 10,
'uri': "mongodb://localhost:27017"
}
)
def test_health_with_pool(self, params):
# FIXME(flwang): Please use mongodb after the sqlalchemy is disabled # FIXME(flwang): Please use mongodb after the sqlalchemy is disabled
# as pool node and the mongodb is working on gate successfully. # as pool node and the mongodb is working on gate successfully.
doc = helpers.create_pool_body( doc = helpers.create_pool_body(
weight=params.get('weight', 10), weight=10,
uri=params.get('uri', "mongodb://localhost:27017") uri="mongodb://localhost:27017",
options=dict(database='zaqar_test_pooled_1')
) )
pool_name = params.get('name', "pool_1") pool_name = "pool_1"
self.addCleanup(self.client.delete, url='/pools/' + pool_name) self.addCleanup(self.client.delete, url='/pools/' + pool_name)
result = self.client.put('/pools/' + pool_name, data=doc) result = self.client.put('/pools/' + pool_name, data=doc)

View File

@ -40,16 +40,22 @@ from zaqar.tests.unit.storage import base
class MongodbSetupMixin(object): class MongodbSetupMixin(object):
def _purge_databases(self): def _purge_databases(self):
if isinstance(self.driver, mongodb.DataDriver):
databases = (self.driver.message_databases + databases = (self.driver.message_databases +
[self.driver.queues_database, [self.control.queues_database,
self.driver.subscriptions_database]) self.driver.subscriptions_database])
else:
databases = [self.driver.queues_database]
for db in databases: for db in databases:
self.driver.connection.drop_database(db) self.driver.connection.drop_database(db)
def _prepare_conf(self): def _prepare_conf(self):
if options.MESSAGE_MONGODB_GROUP in self.conf:
self.config(options.MESSAGE_MONGODB_GROUP, self.config(options.MESSAGE_MONGODB_GROUP,
database=uuid.uuid4().hex) database=uuid.uuid4().hex)
if options.MANAGEMENT_MONGODB_GROUP in self.conf:
self.config(options.MANAGEMENT_MONGODB_GROUP, self.config(options.MANAGEMENT_MONGODB_GROUP,
database=uuid.uuid4().hex) database=uuid.uuid4().hex)
@ -69,6 +75,7 @@ class MongodbUtilsTest(MongodbSetupMixin, testing.TestBase):
MockDriver = collections.namedtuple('MockDriver', 'mongodb_conf') MockDriver = collections.namedtuple('MockDriver', 'mongodb_conf')
self.driver = MockDriver(self.mongodb_conf) self.driver = MockDriver(self.mongodb_conf)
self.control_driver = MockDriver(self.mongodb_conf)
def test_scope_queue_name(self): def test_scope_queue_name(self):
self.assertEqual(utils.scope_queue_name('my-q'), '/my-q') self.assertEqual(utils.scope_queue_name('my-q'), '/my-q')
@ -153,14 +160,12 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
def test_db_instance(self): def test_db_instance(self):
self.config(unreliable=True) self.config(unreliable=True)
cache = oslo_cache.get_cache() cache = oslo_cache.get_cache()
driver = mongodb.DataDriver(self.conf, cache) control = mongodb.ControlDriver(self.conf, cache)
data = mongodb.DataDriver(self.conf, cache, control)
databases = (driver.message_databases + for db in data.message_databases:
[driver.queues_database])
for db in databases:
self.assertThat(db.name, matchers.StartsWith( self.assertThat(db.name, matchers.StartsWith(
driver.mongodb_conf.database)) data.mongodb_conf.database))
def test_version_match(self): def test_version_match(self):
self.config(unreliable=True) self.config(unreliable=True)
@ -169,12 +174,14 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
with mock.patch('pymongo.MongoClient.server_info') as info: with mock.patch('pymongo.MongoClient.server_info') as info:
info.return_value = {'version': '2.1'} info.return_value = {'version': '2.1'}
self.assertRaises(RuntimeError, mongodb.DataDriver, self.assertRaises(RuntimeError, mongodb.DataDriver,
self.conf, cache) self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
info.return_value = {'version': '2.11'} info.return_value = {'version': '2.11'}
try: try:
mongodb.DataDriver(self.conf, cache) mongodb.DataDriver(self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
except RuntimeError: except RuntimeError:
self.fail('version match failed') self.fail('version match failed')
@ -186,21 +193,24 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos: with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos:
is_mongos.__get__ = mock.Mock(return_value=False) is_mongos.__get__ = mock.Mock(return_value=False)
self.assertRaises(RuntimeError, mongodb.DataDriver, self.assertRaises(RuntimeError, mongodb.DataDriver,
self.conf, cache) self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
def test_using_replset(self): def test_using_replset(self):
cache = oslo_cache.get_cache() cache = oslo_cache.get_cache()
with mock.patch('pymongo.MongoClient.nodes') as nodes: with mock.patch('pymongo.MongoClient.nodes') as nodes:
nodes.__get__ = mock.Mock(return_value=['node1', 'node2']) nodes.__get__ = mock.Mock(return_value=['node1', 'node2'])
mongodb.DataDriver(self.conf, cache) mongodb.DataDriver(self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
def test_using_mongos(self): def test_using_mongos(self):
cache = oslo_cache.get_cache() cache = oslo_cache.get_cache()
with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos: with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos:
is_mongos.__get__ = mock.Mock(return_value=True) is_mongos.__get__ = mock.Mock(return_value=True)
mongodb.DataDriver(self.conf, cache) mongodb.DataDriver(self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
def test_write_concern_check_works(self): def test_write_concern_check_works(self):
cache = oslo_cache.get_cache() cache = oslo_cache.get_cache()
@ -211,17 +221,21 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
with mock.patch('pymongo.MongoClient.write_concern') as wc: with mock.patch('pymongo.MongoClient.write_concern') as wc:
wc.__get__ = mock.Mock(return_value={'w': 1}) wc.__get__ = mock.Mock(return_value={'w': 1})
self.assertRaises(RuntimeError, mongodb.DataDriver, self.assertRaises(RuntimeError, mongodb.DataDriver,
self.conf, cache) self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
wc.__get__ = mock.Mock(return_value={'w': 2}) wc.__get__ = mock.Mock(return_value={'w': 2})
mongodb.DataDriver(self.conf, cache) mongodb.DataDriver(self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
def test_write_concern_is_set(self): def test_write_concern_is_set(self):
cache = oslo_cache.get_cache() cache = oslo_cache.get_cache()
with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos: with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos:
is_mongos.__get__ = mock.Mock(return_value=True) is_mongos.__get__ = mock.Mock(return_value=True)
driver = mongodb.DataDriver(self.conf, cache) driver = mongodb.DataDriver(self.conf, cache,
mongodb.ControlDriver
(self.conf, cache))
wc = driver.connection.write_concern wc = driver.connection.write_concern
self.assertEqual(wc['w'], 'majority') self.assertEqual(wc['w'], 'majority')
self.assertEqual(wc['j'], False) self.assertEqual(wc['j'], False)
@ -230,25 +244,16 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
@testing.requires_mongodb @testing.requires_mongodb
class MongodbQueueTests(MongodbSetupMixin, base.QueueControllerTest): class MongodbQueueTests(MongodbSetupMixin, base.QueueControllerTest):
driver_class = mongodb.DataDriver driver_class = mongodb.ControlDriver
config_file = 'wsgi_mongodb.conf' config_file = 'wsgi_mongodb.conf'
controller_class = controllers.QueueController controller_class = controllers.QueueController
control_driver_class = mongodb.ControlDriver
def test_indexes(self): def test_indexes(self):
collection = self.controller._collection collection = self.controller._collection
indexes = collection.index_information() indexes = collection.index_information()
self.assertIn('p_q_1', indexes) self.assertIn('p_q_1', indexes)
def test_messages_purged(self):
queue_name = 'test'
self.controller.create(queue_name)
self.message_controller.post(queue_name,
[{'ttl': 60}],
1234)
self.controller.delete(queue_name)
for collection in self.message_controller._collections:
self.assertEqual(collection.find({'q': queue_name}).count(), 0)
def test_raises_connection_error(self): def test_raises_connection_error(self):
with mock.patch.object(cursor.Cursor, with mock.patch.object(cursor.Cursor,
@ -268,6 +273,7 @@ class MongodbMessageTests(MongodbSetupMixin, base.MessageControllerTest):
driver_class = mongodb.DataDriver driver_class = mongodb.DataDriver
config_file = 'wsgi_mongodb.conf' config_file = 'wsgi_mongodb.conf'
controller_class = controllers.MessageController controller_class = controllers.MessageController
control_driver_class = mongodb.ControlDriver
# NOTE(kgriffs): MongoDB's TTL scavenger only runs once a minute # NOTE(kgriffs): MongoDB's TTL scavenger only runs once a minute
gc_interval = 60 gc_interval = 60
@ -349,6 +355,7 @@ class MongodbFIFOMessageTests(MongodbSetupMixin, base.MessageControllerTest):
driver_class = mongodb.DataDriver driver_class = mongodb.DataDriver
config_file = 'wsgi_fifo_mongodb.conf' config_file = 'wsgi_fifo_mongodb.conf'
controller_class = controllers.FIFOMessageController controller_class = controllers.FIFOMessageController
control_driver_class = mongodb.ControlDriver
# NOTE(kgriffs): MongoDB's TTL scavenger only runs once a minute # NOTE(kgriffs): MongoDB's TTL scavenger only runs once a minute
gc_interval = 60 gc_interval = 60
@ -427,6 +434,7 @@ class MongodbClaimTests(MongodbSetupMixin, base.ClaimControllerTest):
driver_class = mongodb.DataDriver driver_class = mongodb.DataDriver
config_file = 'wsgi_mongodb.conf' config_file = 'wsgi_mongodb.conf'
controller_class = controllers.ClaimController controller_class = controllers.ClaimController
control_driver_class = mongodb.ControlDriver
def test_claim_doesnt_exist(self): def test_claim_doesnt_exist(self):
"""Verifies that operations fail on expired/missing claims. """Verifies that operations fail on expired/missing claims.
@ -462,6 +470,7 @@ class MongodbSubscriptionTests(MongodbSetupMixin,
driver_class = mongodb.DataDriver driver_class = mongodb.DataDriver
config_file = 'wsgi_mongodb.conf' config_file = 'wsgi_mongodb.conf'
controller_class = controllers.SubscriptionController controller_class = controllers.SubscriptionController
control_driver_class = mongodb.ControlDriver
# #
@ -473,6 +482,7 @@ class MongodbPoolsTests(base.PoolsControllerTest):
config_file = 'wsgi_mongodb.conf' config_file = 'wsgi_mongodb.conf'
driver_class = mongodb.ControlDriver driver_class = mongodb.ControlDriver
controller_class = controllers.PoolsController controller_class = controllers.PoolsController
control_driver_class = mongodb.ControlDriver
def setUp(self): def setUp(self):
super(MongodbPoolsTests, self).setUp() super(MongodbPoolsTests, self).setUp()
@ -500,6 +510,7 @@ class MongodbPoolsTests(base.PoolsControllerTest):
class MongodbCatalogueTests(base.CatalogueControllerTest): class MongodbCatalogueTests(base.CatalogueControllerTest):
driver_class = mongodb.ControlDriver driver_class = mongodb.ControlDriver
controller_class = controllers.CatalogueController controller_class = controllers.CatalogueController
control_driver_class = mongodb.ControlDriver
def setUp(self): def setUp(self):
super(MongodbCatalogueTests, self).setUp() super(MongodbCatalogueTests, self).setUp()
@ -522,15 +533,6 @@ class PooledMessageTests(base.MessageControllerTest):
gc_interval = 60 gc_interval = 60
@testing.requires_mongodb
class PooledQueueTests(base.QueueControllerTest):
config_file = 'wsgi_mongodb_pooled.conf'
controller_class = pooling.QueueController
driver_class = pooling.DataDriver
control_driver_class = mongodb.ControlDriver
controller_base_class = storage.Queue
@testing.requires_mongodb @testing.requires_mongodb
class PooledClaimsTests(base.ClaimControllerTest): class PooledClaimsTests(base.ClaimControllerTest):
config_file = 'wsgi_mongodb_pooled.conf' config_file = 'wsgi_mongodb_pooled.conf'
@ -554,6 +556,7 @@ class PooledClaimsTests(base.ClaimControllerTest):
class MongodbFlavorsTest(base.FlavorsControllerTest): class MongodbFlavorsTest(base.FlavorsControllerTest):
driver_class = mongodb.ControlDriver driver_class = mongodb.ControlDriver
controller_class = controllers.FlavorsController controller_class = controllers.FlavorsController
control_driver_class = mongodb.ControlDriver
def setUp(self): def setUp(self):
super(MongodbFlavorsTest, self).setUp() super(MongodbFlavorsTest, self).setUp()

View File

@ -24,6 +24,7 @@ import redis
from zaqar.common import errors from zaqar.common import errors
from zaqar.openstack.common.cache import cache as oslo_cache from zaqar.openstack.common.cache import cache as oslo_cache
from zaqar import storage from zaqar import storage
from zaqar.storage import mongodb
from zaqar.storage.redis import controllers from zaqar.storage.redis import controllers
from zaqar.storage.redis import driver from zaqar.storage.redis import driver
from zaqar.storage.redis import messages from zaqar.storage.redis import messages
@ -173,7 +174,9 @@ class RedisDriverTest(testing.TestBase):
def test_db_instance(self): def test_db_instance(self):
cache = oslo_cache.get_cache() cache = oslo_cache.get_cache()
redis_driver = driver.DataDriver(self.conf, cache) redis_driver = driver.DataDriver(self.conf, cache,
driver.ControlDriver
(self.conf, cache))
self.assertTrue(isinstance(redis_driver.connection, redis.StrictRedis)) self.assertTrue(isinstance(redis_driver.connection, redis.StrictRedis))
@ -183,12 +186,14 @@ class RedisDriverTest(testing.TestBase):
with mock.patch('redis.StrictRedis.info') as info: with mock.patch('redis.StrictRedis.info') as info:
info.return_value = {'redis_version': '2.4.6'} info.return_value = {'redis_version': '2.4.6'}
self.assertRaises(RuntimeError, driver.DataDriver, self.assertRaises(RuntimeError, driver.DataDriver,
self.conf, cache) self.conf, cache,
driver.ControlDriver(self.conf, cache))
info.return_value = {'redis_version': '2.11'} info.return_value = {'redis_version': '2.11'}
try: try:
driver.DataDriver(self.conf, cache) driver.DataDriver(self.conf, cache,
driver.ControlDriver(self.conf, cache))
except RuntimeError: except RuntimeError:
self.fail('version match failed') self.fail('version match failed')
@ -281,6 +286,7 @@ class RedisQueuesTest(base.QueueControllerTest):
driver_class = driver.DataDriver driver_class = driver.DataDriver
config_file = 'wsgi_redis.conf' config_file = 'wsgi_redis.conf'
controller_class = controllers.QueueController controller_class = controllers.QueueController
control_driver_class = mongodb.ControlDriver
def setUp(self): def setUp(self):
super(RedisQueuesTest, self).setUp() super(RedisQueuesTest, self).setUp()
@ -297,11 +303,11 @@ class RedisMessagesTest(base.MessageControllerTest):
driver_class = driver.DataDriver driver_class = driver.DataDriver
config_file = 'wsgi_redis.conf' config_file = 'wsgi_redis.conf'
controller_class = controllers.MessageController controller_class = controllers.MessageController
control_driver_class = mongodb.ControlDriver
def setUp(self): def setUp(self):
super(RedisMessagesTest, self).setUp() super(RedisMessagesTest, self).setUp()
self.connection = self.driver.connection self.connection = self.driver.connection
self.queue_ctrl = self.driver.queue_controller
def tearDown(self): def tearDown(self):
super(RedisMessagesTest, self).tearDown() super(RedisMessagesTest, self).tearDown()
@ -309,7 +315,7 @@ class RedisMessagesTest(base.MessageControllerTest):
def test_count(self): def test_count(self):
queue_name = 'get-count' queue_name = 'get-count'
self.queue_ctrl.create(queue_name) self.queue_controller.create(queue_name)
msgs = [{ msgs = [{
'ttl': 300, 'ttl': 300,
@ -325,13 +331,13 @@ class RedisMessagesTest(base.MessageControllerTest):
def test_empty_queue_exception(self): def test_empty_queue_exception(self):
queue_name = 'empty-queue-test' queue_name = 'empty-queue-test'
self.queue_ctrl.create(queue_name) self.queue_controller.create(queue_name)
self.assertRaises(storage.errors.QueueIsEmpty, self.assertRaises(storage.errors.QueueIsEmpty,
self.controller.first, queue_name) self.controller.first, queue_name)
def test_gc(self): def test_gc(self):
self.queue_ctrl.create(self.queue_name) self.queue_controller.create(self.queue_name)
self.controller.post(self.queue_name, self.controller.post(self.queue_name,
[{'ttl': 0, 'body': {}}], [{'ttl': 0, 'body': {}}],
client_uuid=str(uuid.uuid4())) client_uuid=str(uuid.uuid4()))
@ -353,12 +359,11 @@ class RedisClaimsTest(base.ClaimControllerTest):
driver_class = driver.DataDriver driver_class = driver.DataDriver
config_file = 'wsgi_redis.conf' config_file = 'wsgi_redis.conf'
controller_class = controllers.ClaimController controller_class = controllers.ClaimController
control_driver_class = mongodb.ControlDriver
def setUp(self): def setUp(self):
super(RedisClaimsTest, self).setUp() super(RedisClaimsTest, self).setUp()
self.connection = self.driver.connection self.connection = self.driver.connection
self.queue_ctrl = self.driver.queue_controller
self.message_ctrl = self.driver.message_controller
def tearDown(self): def tearDown(self):
super(RedisClaimsTest, self).tearDown() super(RedisClaimsTest, self).tearDown()
@ -367,7 +372,7 @@ class RedisClaimsTest(base.ClaimControllerTest):
def test_claim_doesnt_exist(self): def test_claim_doesnt_exist(self):
queue_name = 'no-such-claim' queue_name = 'no-such-claim'
epoch = '000000000000000000000000' epoch = '000000000000000000000000'
self.queue_ctrl.create(queue_name) self.queue_controller.create(queue_name)
self.assertRaises(storage.errors.ClaimDoesNotExist, self.assertRaises(storage.errors.ClaimDoesNotExist,
self.controller.get, queue_name, self.controller.get, queue_name,
epoch, project=None) epoch, project=None)
@ -383,10 +388,10 @@ class RedisClaimsTest(base.ClaimControllerTest):
claim_id, {}, project=None) claim_id, {}, project=None)
def test_gc(self): def test_gc(self):
self.queue_ctrl.create(self.queue_name) self.queue_controller.create(self.queue_name)
for _ in range(100): for _ in range(100):
self.message_ctrl.post(self.queue_name, self.message_controller.post(self.queue_name,
[{'ttl': 300, 'body': 'yo gabba'}], [{'ttl': 300, 'body': 'yo gabba'}],
client_uuid=str(uuid.uuid4())) client_uuid=str(uuid.uuid4()))

View File

@ -65,7 +65,7 @@ class PoolCatalogTest(testing.TestBase):
def test_lookup_loads_correct_driver(self): def test_lookup_loads_correct_driver(self):
storage = self.catalog.lookup(self.queue, self.project) storage = self.catalog.lookup(self.queue, self.project)
self.assertIsInstance(storage, mongodb.DataDriver) self.assertIsInstance(storage._storage, mongodb.DataDriver)
def test_lookup_returns_none_if_queue_not_mapped(self): def test_lookup_returns_none_if_queue_not_mapped(self):
self.assertIsNone(self.catalog.lookup('not', 'mapped')) self.assertIsNone(self.catalog.lookup('not', 'mapped'))
@ -77,14 +77,14 @@ class PoolCatalogTest(testing.TestBase):
def test_register_leads_to_successful_lookup(self): def test_register_leads_to_successful_lookup(self):
self.catalog.register('not_yet', 'mapped') self.catalog.register('not_yet', 'mapped')
storage = self.catalog.lookup('not_yet', 'mapped') storage = self.catalog.lookup('not_yet', 'mapped')
self.assertIsInstance(storage, mongodb.DataDriver) self.assertIsInstance(storage._storage, mongodb.DataDriver)
def test_register_with_flavor(self): def test_register_with_flavor(self):
queue = 'test' queue = 'test'
self.catalog.register(queue, project=self.project, self.catalog.register(queue, project=self.project,
flavor=self.flavor) flavor=self.flavor)
storage = self.catalog.lookup(queue, self.project) storage = self.catalog.lookup(queue, self.project)
self.assertIsInstance(storage, mongodb.DataDriver) self.assertIsInstance(storage._storage, mongodb.DataDriver)
def test_register_with_fake_flavor(self): def test_register_with_fake_flavor(self):
self.assertRaises(errors.FlavorDoesNotExist, self.assertRaises(errors.FlavorDoesNotExist,

View File

@ -1,99 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import random
import uuid
import six
from zaqar.openstack.common.cache import cache as oslo_cache
from zaqar.storage import pooling
from zaqar.storage import utils
from zaqar import tests as testing
@testing.requires_mongodb
class PoolQueuesTest(testing.TestBase):
config_file = 'wsgi_mongodb_pooled.conf'
def setUp(self):
super(PoolQueuesTest, self).setUp()
cache = oslo_cache.get_cache()
control = utils.load_storage_driver(self.conf, cache,
control_mode=True)
self.pools_ctrl = control.pools_controller
self.driver = pooling.DataDriver(self.conf, cache, control)
self.controller = self.driver.queue_controller
# fake two pools
for i in six.moves.xrange(2):
options = {'database': "zaqar_test_pools_" + str(i)}
self.pools_ctrl.create(str(uuid.uuid1()), 100,
'mongodb://localhost:27017',
options=options)
def tearDown(self):
self.pools_ctrl.drop_all()
super(PoolQueuesTest, self).tearDown()
def test_ping(self):
ping = self.driver.is_alive()
self.assertTrue(ping)
def test_listing(self):
project = "I.G"
interaction = self.controller.list(project=project,
detailed=False)
queues = list(next(interaction))
self.assertEqual(len(queues), 0)
for n in six.moves.xrange(10):
name = 'queue_%d' % n
self.controller.create(name, project=project)
self.controller.set_metadata(name,
metadata=random.getrandbits(12),
project=project)
interaction = self.controller.list(project=project,
detailed=True,
limit=7)
queues.extend(next(interaction))
marker = next(interaction)
self.assertEqual(len(queues), 7)
interaction = self.controller.list(project=project,
detailed=True,
limit=7,
marker=marker)
queues.extend(next(interaction))
self.assertEqual(len(queues), 10)
# ordered by name as a whole
self.assertTrue(all(queues[i]['name'] <= queues[i + 1]['name']
for i in six.moves.xrange(len(queues) - 1)))
for n in six.moves.xrange(10):
self.controller.delete('queue_%d' % n, project=project)
interaction = self.controller.list(project=project,
detailed=False)
queues = list(next(interaction))
self.assertEqual(len(queues), 0)

View File

@ -103,17 +103,17 @@ class Bootstrap(object):
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)
def storage(self): def storage(self):
LOG.debug(u'Loading storage driver') LOG.debug(u'Loading storage driver')
if self.conf.pooling: if self.conf.pooling:
LOG.debug(u'Storage pooling enabled') LOG.debug(u'Storage pooling enabled')
storage_driver = pooling.DataDriver(self.conf, self.cache, storage_driver = pooling.DataDriver(self.conf, self.cache,
self.control) self.control)
else: else:
storage_driver = storage_utils.load_storage_driver( storage_driver = storage_utils.load_storage_driver(
self.conf, self.cache) self.conf, self.cache, control_driver=self.control)
LOG.debug(u'Loading storage pipeline') LOG.debug(u'Loading storage pipeline')
return pipeline.DataDriver(self.conf, storage_driver) return pipeline.DataDriver(self.conf, storage_driver,
self.control)
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)
def control(self): def control(self):

View File

@ -25,6 +25,7 @@ import enum
from oslo_config import cfg from oslo_config import cfg
import six import six
from zaqar.common import decorators
import zaqar.openstack.common.log as logging import zaqar.openstack.common.log as logging
from zaqar.storage import errors from zaqar.storage import errors
from zaqar.storage import utils from zaqar.storage import utils
@ -95,8 +96,11 @@ class DataDriverBase(DriverBase):
BASE_CAPABILITIES = [] BASE_CAPABILITIES = []
def __init__(self, conf, cache): def __init__(self, conf, cache, control_driver):
super(DataDriverBase, self).__init__(conf, cache) super(DataDriverBase, self).__init__(conf, cache)
# creating ControlDriver instance for accessing QueueController's
# data from DataDriver
self.control_driver = control_driver
@abc.abstractmethod @abc.abstractmethod
def is_alive(self): def is_alive(self):
@ -195,6 +199,9 @@ class DataDriverBase(DriverBase):
_handle_status('delete_claim', func) _handle_status('delete_claim', func)
# delete queue # delete queue
func = functools.partial(self.message_controller.bulk_delete,
queue, msg_ids, project=project)
_handle_status('bulk_delete_messages', func)
func = functools.partial(self.queue_controller.delete, func = functools.partial(self.queue_controller.delete,
queue, project=project) queue, project=project)
_handle_status('delete_queue', func) _handle_status('delete_queue', func)
@ -211,10 +218,9 @@ class DataDriverBase(DriverBase):
""" """
pass pass
@abc.abstractproperty @decorators.lazy_property(write=False)
def queue_controller(self): def queue_controller(self):
"""Returns the driver's queue controller.""" return self.control_driver.queue_controller
raise NotImplementedError
@abc.abstractproperty @abc.abstractproperty
def message_controller(self): def message_controller(self):
@ -265,6 +271,11 @@ class ControlDriverBase(DriverBase):
"""Returns storage's flavor management controller.""" """Returns storage's flavor management controller."""
raise NotImplementedError raise NotImplementedError
@abc.abstractproperty
def queue_controller(self):
"""Returns the driver's queue controller."""
raise NotImplementedError
class ControllerBase(object): class ControllerBase(object):
"""Top-level class for controllers. """Top-level class for controllers.

View File

@ -78,8 +78,8 @@ class DataDriver(storage.DataDriverBase):
_COL_SUFIX = "_messages_p" _COL_SUFIX = "_messages_p"
def __init__(self, conf, cache): def __init__(self, conf, cache, control_driver):
super(DataDriver, self).__init__(conf, cache) super(DataDriver, self).__init__(conf, cache, control_driver)
self.mongodb_conf = self.conf[options.MESSAGE_MONGODB_GROUP] self.mongodb_conf = self.conf[options.MESSAGE_MONGODB_GROUP]
@ -146,17 +146,6 @@ class DataDriver(storage.DataDriverBase):
KPI['message_volume'] = message_volume KPI['message_volume'] = message_volume
return KPI return KPI
@decorators.lazy_property(write=False)
def queues_database(self):
"""Database dedicated to the "queues" collection.
The queues collection is separated out into its own database
to avoid writer lock contention with the messages collections.
"""
name = self.mongodb_conf.database + '_queues'
return self.connection[name]
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)
def message_databases(self): def message_databases(self):
"""List of message databases, ordered by partition number.""" """List of message databases, ordered by partition number."""
@ -185,10 +174,6 @@ class DataDriver(storage.DataDriverBase):
"""MongoDB client connection instance.""" """MongoDB client connection instance."""
return _connection(self.mongodb_conf) return _connection(self.mongodb_conf)
@decorators.lazy_property(write=False)
def queue_controller(self):
return controllers.QueueController(self)
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)
def message_controller(self): def message_controller(self):
return controllers.MessageController(self) return controllers.MessageController(self)
@ -236,6 +221,21 @@ class ControlDriver(storage.ControlDriverBase):
name = self.mongodb_conf.database name = self.mongodb_conf.database
return self.connection[name] return self.connection[name]
@decorators.lazy_property(write=False)
def queues_database(self):
"""Database dedicated to the "queues" collection.
The queues collection is separated out into its own database
to avoid writer lock contention with the messages collections.
"""
name = self.mongodb_conf.database + '_queues'
return self.connection[name]
@decorators.lazy_property(write=False)
def queue_controller(self):
return controllers.QueueController(self)
@property @property
def pools_controller(self): def pools_controller(self):
return controllers.PoolsController(self) return controllers.PoolsController(self)

View File

@ -862,3 +862,64 @@ def _basic_message(msg, now):
'body': msg['b'], 'body': msg['b'],
'claim_id': str(msg['c']['id']) if msg['c']['id'] else None 'claim_id': str(msg['c']['id']) if msg['c']['id'] else None
} }
# NOTE(kgriffs): E.g.: 'queuecontroller:exists:5083853/my-queue'
_QUEUE_CACHE_PREFIX = 'queuecontroller:'
_QUEUE_CACHE_TTL = 5
def _queue_exists_key(queue, project=None):
# NOTE(kgriffs): Use string concatenation for performance,
# also put project first since it is guaranteed to be
# unique, which should reduce lookup time.
return _QUEUE_CACHE_PREFIX + 'exists:' + str(project) + '/' + queue
class MessageQueueHandler(object):
def __init__(self, driver, control_driver):
self.driver = driver
self._cache = self.driver.cache
self.queue_controller = self.driver.queue_controller
self.message_controller = self.driver.message_controller
def delete(self, queue_name, project=None):
self.message_controller._purge_queue(queue_name, project)
@utils.raises_conn_error
@utils.retries_on_autoreconnect
def stats(self, name, project=None):
if not self.queue_controller.exists(name, project=project):
raise errors.QueueDoesNotExist(name, project)
controller = self.message_controller
active = controller._count(name, project=project,
include_claimed=False)
total = controller._count(name, project=project,
include_claimed=True)
message_stats = {
'claimed': total - active,
'free': active,
'total': total,
}
try:
oldest = controller.first(name, project=project, sort=1)
newest = controller.first(name, project=project, sort=-1)
except errors.QueueIsEmpty:
pass
else:
now = timeutils.utcnow_ts()
message_stats['oldest'] = utils.stat_message(oldest, now)
message_stats['newest'] = utils.stat_message(newest, now)
return {'messages': message_stats}
def _get_scoped_query(name, project):
return {'p_q': utils.scope_queue_name(name, project)}

View File

@ -278,39 +278,12 @@ class QueueController(storage.Queue):
@utils.retries_on_autoreconnect @utils.retries_on_autoreconnect
@_exists.purges @_exists.purges
def _delete(self, name, project=None): def _delete(self, name, project=None):
self.driver.message_controller._purge_queue(name, project)
self._collection.remove(_get_scoped_query(name, project)) self._collection.remove(_get_scoped_query(name, project))
@utils.raises_conn_error @utils.raises_conn_error
@utils.retries_on_autoreconnect @utils.retries_on_autoreconnect
def _stats(self, name, project=None): def _stats(self, name, project=None):
if not self.exists(name, project=project):
raise errors.QueueDoesNotExist(name, project)
controller = self.driver.message_controller
active = controller._count(name, project=project,
include_claimed=False)
total = controller._count(name, project=project,
include_claimed=True)
message_stats = {
'claimed': total - active,
'free': active,
'total': total,
}
try:
oldest = controller.first(name, project=project, sort=1)
newest = controller.first(name, project=project, sort=-1)
except errors.QueueIsEmpty:
pass pass
else:
now = timeutils.utcnow_ts()
message_stats['oldest'] = utils.stat_message(oldest, now)
message_stats['newest'] = utils.stat_message(newest, now)
return {'messages': message_stats}
def _get_scoped_query(name, project): def _get_scoped_query(name, project):

View File

@ -47,7 +47,8 @@ class SubscriptionController(base.Subscription):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(SubscriptionController, self).__init__(*args, **kwargs) super(SubscriptionController, self).__init__(*args, **kwargs)
self._collection = self.driver.subscriptions_database.subscriptions self._collection = self.driver.subscriptions_database.subscriptions
self._queue_collection = self.driver.queues_database.queues queue_col = self.driver.control_driver.queues_database.queues
self._queue_collection = queue_col
self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True) self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True)
@utils.raises_conn_error @utils.raises_conn_error

View File

@ -85,7 +85,7 @@ def _get_storage_pipeline(resource_name, conf, *args, **kwargs):
return pipeline return pipeline
def _get_builtin_entry_points(resource_name, storage): def _get_builtin_entry_points(resource_name, storage, control_driver):
# Load builtin stages # Load builtin stages
builtin_entry_points = [] builtin_entry_points = []
@ -96,7 +96,8 @@ def _get_builtin_entry_points(resource_name, storage):
namespace = '%s.%s.stages' % (storage.__module__, resource_name) namespace = '%s.%s.stages' % (storage.__module__, resource_name)
extensions = extension.ExtensionManager(namespace, extensions = extension.ExtensionManager(namespace,
invoke_on_load=True, invoke_on_load=True,
invoke_args=[storage]) invoke_args=[storage,
control_driver])
if len(extensions.extensions) == 0: if len(extensions.extensions) == 0:
return [] return []
@ -115,10 +116,10 @@ class DataDriver(base.DataDriverBase):
last step in the pipeline last step in the pipeline
""" """
def __init__(self, conf, storage): def __init__(self, conf, storage, control_driver):
# NOTE(kgriffs): Pass None for cache since it won't ever # NOTE(kgriffs): Pass None for cache since it won't ever
# be referenced. # be referenced.
super(DataDriver, self).__init__(conf, None) super(DataDriver, self).__init__(conf, None, control_driver)
self._storage = storage self._storage = storage
@property @property
@ -133,14 +134,16 @@ class DataDriver(base.DataDriverBase):
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)
def queue_controller(self): def queue_controller(self):
stages = _get_builtin_entry_points('queue', self._storage) stages = _get_builtin_entry_points('queue', self._storage,
self.control_driver)
stages.extend(_get_storage_pipeline('queue', self.conf)) stages.extend(_get_storage_pipeline('queue', self.conf))
stages.append(self._storage.queue_controller) stages.append(self._storage.queue_controller)
return common.Pipeline(stages) return common.Pipeline(stages)
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)
def message_controller(self): def message_controller(self):
stages = _get_builtin_entry_points('message', self._storage) stages = _get_builtin_entry_points('message', self._storage,
self.control_driver)
kwargs = {'subscription_controller': kwargs = {'subscription_controller':
self._storage.subscription_controller} self._storage.subscription_controller}
stages.extend(_get_storage_pipeline('message', self.conf, **kwargs)) stages.extend(_get_storage_pipeline('message', self.conf, **kwargs))
@ -149,14 +152,16 @@ class DataDriver(base.DataDriverBase):
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)
def claim_controller(self): def claim_controller(self):
stages = _get_builtin_entry_points('claim', self._storage) stages = _get_builtin_entry_points('claim', self._storage,
self.control_driver)
stages.extend(_get_storage_pipeline('claim', self.conf)) stages.extend(_get_storage_pipeline('claim', self.conf))
stages.append(self._storage.claim_controller) stages.append(self._storage.claim_controller)
return common.Pipeline(stages) return common.Pipeline(stages)
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)
def subscription_controller(self): def subscription_controller(self):
stages = _get_builtin_entry_points('subscription', self._storage) stages = _get_builtin_entry_points('subscription', self._storage,
self.control_driver)
stages.extend(_get_storage_pipeline('subscription', self.conf)) stages.extend(_get_storage_pipeline('subscription', self.conf))
stages.append(self._storage.subscription_controller) stages.append(self._storage.subscription_controller)
return common.Pipeline(stages) return common.Pipeline(stages)

View File

@ -23,6 +23,7 @@ from zaqar.common.storage import select
from zaqar.openstack.common import log from zaqar.openstack.common import log
from zaqar import storage from zaqar import storage
from zaqar.storage import errors from zaqar.storage import errors
from zaqar.storage import pipeline
from zaqar.storage import utils from zaqar.storage import utils
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -69,8 +70,8 @@ class DataDriver(storage.DataDriverBase):
BASE_CAPABILITIES = tuple(storage.Capabilities) BASE_CAPABILITIES = tuple(storage.Capabilities)
def __init__(self, conf, cache, control): def __init__(self, conf, cache, control, control_driver=None):
super(DataDriver, self).__init__(conf, cache) super(DataDriver, self).__init__(conf, cache, control_driver)
self._pool_catalog = Catalog(conf, cache, control) self._pool_catalog = Catalog(conf, cache, control)
@property @property
@ -404,6 +405,7 @@ class Catalog(object):
self._drivers = {} self._drivers = {}
self._conf = conf self._conf = conf
self._cache = cache self._cache = cache
self.control = control
self._conf.register_opts(_CATALOG_OPTIONS, group=_CATALOG_GROUP) self._conf.register_opts(_CATALOG_OPTIONS, group=_CATALOG_GROUP)
self._catalog_conf = self._conf[_CATALOG_GROUP] self._catalog_conf = self._conf[_CATALOG_GROUP]
@ -424,7 +426,10 @@ class Catalog(object):
pool = self._pools_ctrl.get(pool_id, detailed=True) pool = self._pools_ctrl.get(pool_id, detailed=True)
conf = utils.dynamic_conf(pool['uri'], pool['options'], conf = utils.dynamic_conf(pool['uri'], pool['options'],
conf=self._conf) conf=self._conf)
return utils.load_storage_driver(conf, self._cache) storage = utils.load_storage_driver(conf,
self._cache,
control_driver=self.control)
return pipeline.DataDriver(conf, storage, self.control)
@decorators.caches(_pool_cache_key, _POOL_CACHE_TTL) @decorators.caches(_pool_cache_key, _POOL_CACHE_TTL)
def _pool_id(self, queue, project=None): def _pool_id(self, queue, project=None):

View File

@ -149,8 +149,8 @@ class DataDriver(storage.DataDriverBase):
_DRIVER_OPTIONS = options._config_options() _DRIVER_OPTIONS = options._config_options()
def __init__(self, conf, cache): def __init__(self, conf, cache, control_driver):
super(DataDriver, self).__init__(conf, cache) super(DataDriver, self).__init__(conf, cache, control_driver)
self.redis_conf = self.conf[options.MESSAGE_REDIS_GROUP] self.redis_conf = self.conf[options.MESSAGE_REDIS_GROUP]
server_version = self.connection.info()['redis_version'] server_version = self.connection.info()['redis_version']
@ -194,10 +194,6 @@ class DataDriver(storage.DataDriverBase):
"""Redis client connection instance.""" """Redis client connection instance."""
return _get_redis_client(self) return _get_redis_client(self)
@decorators.lazy_property(write=False)
def queue_controller(self):
return controllers.QueueController(self)
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)
def message_controller(self): def message_controller(self):
return controllers.MessageController(self) return controllers.MessageController(self)
@ -226,6 +222,10 @@ class ControlDriver(storage.ControlDriverBase):
"""Redis client connection instance.""" """Redis client connection instance."""
return _get_redis_client(self) return _get_redis_client(self)
@decorators.lazy_property(write=False)
def queue_controller(self):
return controllers.QueueController(self)
@property @property
def pools_controller(self): def pools_controller(self):
raise NotImplementedError() raise NotImplementedError()

View File

@ -17,6 +17,7 @@ import uuid
from oslo_utils import encodeutils from oslo_utils import encodeutils
from oslo_utils import timeutils from oslo_utils import timeutils
import redis
from zaqar.common import decorators from zaqar.common import decorators
from zaqar import storage from zaqar import storage
@ -558,3 +559,63 @@ def _filter_messages(messages, filters, to_basic, marker):
yield msg.to_basic(now) yield msg.to_basic(now)
else: else:
yield msg yield msg
QUEUES_SET_STORE_NAME = 'queues_set'
class MessageQueueHandler(object):
def __init__(self, driver, control_driver):
self.driver = driver
self._client = self.driver.connection
self._queue_ctrl = self.driver.queue_controller
self._message_ctrl = self.driver.message_controller
self._claim_ctrl = self.driver.claim_controller
@utils.raises_conn_error
def create(self, name, metadata=None, project=None):
with self._client.pipeline() as pipe:
self._message_ctrl._create_msgset(name, project, pipe)
try:
pipe.execute()
except redis.exceptions.ResponseError:
return False
@utils.raises_conn_error
@utils.retries_on_connection_error
def delete(self, name, project=None):
with self._client.pipeline() as pipe:
self._message_ctrl._delete_msgset(name, project, pipe)
self._message_ctrl._delete_queue_messages(name, project, pipe)
pipe.execute()
@utils.raises_conn_error
@utils.retries_on_connection_error
def stats(self, name, project=None):
if not self._queue_ctrl.exists(name, project=project):
raise errors.QueueDoesNotExist(name, project)
total = self._message_ctrl._count(name, project)
if total:
claimed = self._claim_ctrl._count_messages(name, project)
else:
claimed = 0
message_stats = {
'claimed': claimed,
'free': total - claimed,
'total': total,
}
if total:
try:
newest = self._message_ctrl.first(name, project, -1)
oldest = self._message_ctrl.first(name, project, 1)
except errors.QueueIsEmpty:
pass
else:
message_stats['newest'] = newest
message_stats['oldest'] = oldest
return {'messages': message_stats}

View File

@ -70,10 +70,6 @@ class QueueController(storage.Queue):
use_bin_type=True).pack use_bin_type=True).pack
self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8') self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
@decorators.lazy_property(write=False)
def _message_ctrl(self):
return self.driver.message_controller
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)
def _claim_ctrl(self): def _claim_ctrl(self):
return self.driver.claim_controller return self.driver.claim_controller
@ -124,7 +120,7 @@ class QueueController(storage.Queue):
qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project) qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
# Check if the queue already exists. # Check if the queue already exists.
if self.exists(name, project): if self._exists(name, project):
return False return False
queue = { queue = {
@ -137,7 +133,6 @@ class QueueController(storage.Queue):
# Pipeline ensures atomic inserts. # Pipeline ensures atomic inserts.
with self._client.pipeline() as pipe: with self._client.pipeline() as pipe:
pipe.zadd(qset_key, 1, queue_key).hmset(queue_key, queue) pipe.zadd(qset_key, 1, queue_key).hmset(queue_key, queue)
self._message_ctrl._create_msgset(name, project, pipe)
try: try:
pipe.execute() pipe.execute()
@ -187,38 +182,9 @@ class QueueController(storage.Queue):
with self._client.pipeline() as pipe: with self._client.pipeline() as pipe:
pipe.zrem(qset_key, queue_key) pipe.zrem(qset_key, queue_key)
pipe.delete(queue_key) pipe.delete(queue_key)
self._message_ctrl._delete_msgset(name, project, pipe)
self._message_ctrl._delete_queue_messages(name, project, pipe)
pipe.execute() pipe.execute()
@utils.raises_conn_error @utils.raises_conn_error
@utils.retries_on_connection_error @utils.retries_on_connection_error
def _stats(self, name, project=None): def _stats(self, name, project=None):
if not self.exists(name, project=project):
raise errors.QueueDoesNotExist(name, project)
total = self._message_ctrl._count(name, project)
if total:
claimed = self._claim_ctrl._count_messages(name, project)
else:
claimed = 0
message_stats = {
'claimed': claimed,
'free': total - claimed,
'total': total,
}
if total:
try:
newest = self._message_ctrl.first(name, project, -1)
oldest = self._message_ctrl.first(name, project, 1)
except errors.QueueIsEmpty:
pass pass
else:
message_stats['newest'] = newest
message_stats['oldest'] = oldest
return {'messages': message_stats}

View File

@ -88,7 +88,7 @@ class ClaimController(storage.Claim):
with self.driver.trans() as trans: with self.driver.trans() as trans:
try: try:
qid = utils.get_qid(self.driver, queue, project) qid = utils.get_qid(self.driver.control_driver, queue, project)
except errors.QueueDoesNotExist: except errors.QueueDoesNotExist:
return None, iter([]) return None, iter([])
@ -136,7 +136,7 @@ class ClaimController(storage.Claim):
age = utils.get_age(tables.Claims.c.created) age = utils.get_age(tables.Claims.c.created)
with self.driver.trans() as trans: with self.driver.trans() as trans:
qid = utils.get_qid(self.driver, queue, project) qid = utils.get_qid(self.driver.control_driver, queue, project)
update = tables.Claims.update().where(sa.and_( update = tables.Claims.update().where(sa.and_(
tables.Claims.c.ttl > age, tables.Claims.c.ttl > age,
@ -168,7 +168,7 @@ class ClaimController(storage.Claim):
try: try:
# NOTE(flaper87): This could probably use some # NOTE(flaper87): This could probably use some
# joins and be just 1 query. # joins and be just 1 query.
qid = utils.get_qid(self.driver, queue, project) qid = utils.get_qid(self.driver.control_driver, queue, project)
except errors.QueueDoesNotExist: except errors.QueueDoesNotExist:
return return

View File

@ -133,7 +133,8 @@ class MessageController(storage.Message):
if project is None: if project is None:
project = '' project = ''
qid = utils.get_qid(self.driver, queue, project) qid = utils.get_qid(self.driver.control_driver,
queue, project)
sel = sa.sql.select([tables.Messages.c.id, sel = sa.sql.select([tables.Messages.c.id,
tables.Messages.c.body, tables.Messages.c.body,
@ -230,7 +231,8 @@ class MessageController(storage.Message):
project = '' project = ''
with self.driver.trans() as trans: with self.driver.trans() as trans:
qid = utils.get_qid(self.driver, queue, project) qid = utils.get_qid(self.driver.control_driver,
queue, project)
# Delete the expired messages # Delete the expired messages
and_stmt = sa.and_(tables.Messages.c.ttl <= and_stmt = sa.and_(tables.Messages.c.ttl <=
@ -310,7 +312,8 @@ class MessageController(storage.Message):
with self.driver.trans() as trans: with self.driver.trans() as trans:
try: try:
qid = utils.get_qid(self.driver, queue, project) qid = utils.get_qid(self.driver.control_driver,
queue, project)
except errors.QueueDoesNotExist: except errors.QueueDoesNotExist:
return return
@ -359,7 +362,8 @@ class MessageController(storage.Message):
statement = tables.Messages.delete() statement = tables.Messages.delete()
qid = utils.get_qid(self.driver, queue_name, project) qid = utils.get_qid(self.driver.control_driver,
queue_name, project)
and_stmt = [tables.Messages.c.id.in_(message_ids), and_stmt = [tables.Messages.c.id.in_(message_ids),
tables.Messages.c.qid == qid] tables.Messages.c.qid == qid]
@ -367,3 +371,50 @@ class MessageController(storage.Message):
trans.execute(statement.where(sa.and_(*and_stmt))) trans.execute(statement.where(sa.and_(*and_stmt)))
return messages return messages
class MessageQueueHandler(object):
def __init__(self, driver, control_driver):
self.driver = driver
def stats(self, name, project):
if project is None:
project = ''
qid = utils.get_qid(self.driver.control_driver, name, project)
sel = sa.sql.select([
sa.sql.select([sa.func.count(tables.Messages.c.id)],
sa.and_(
tables.Messages.c.qid == qid,
tables.Messages.c.cid != (None),
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created)),
sa.sql.select([sa.func.count(tables.Messages.c.id)],
sa.and_(
tables.Messages.c.qid == qid,
tables.Messages.c.cid == (None),
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created))
])
claimed, free = self.driver.get(sel)
total = free + claimed
message_stats = {
'claimed': claimed,
'free': free,
'total': total,
}
try:
message_controller = self.driver.message_controller
oldest = message_controller.first(name, project, sort=1)
newest = message_controller.first(name, project, sort=-1)
except errors.QueueIsEmpty:
pass
else:
message_stats['oldest'] = utils.stat_message(oldest)
message_stats['newest'] = utils.stat_message(newest)
return {'messages': message_stats}

View File

@ -13,7 +13,6 @@
# the License. # the License.
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy.sql import func as sfunc
from zaqar import storage from zaqar import storage
from zaqar.storage import errors from zaqar.storage import errors
@ -130,43 +129,4 @@ class QueueController(storage.Queue):
self.driver.run(dlt) self.driver.run(dlt)
def _stats(self, name, project): def _stats(self, name, project):
if project is None:
project = ''
qid = utils.get_qid(self.driver, name, project)
sel = sa.sql.select([
sa.sql.select([sa.func.count(tables.Messages.c.id)],
sa.and_(
tables.Messages.c.qid == qid,
tables.Messages.c.cid != (None),
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created)),
sa.sql.select([sa.func.count(tables.Messages.c.id)],
sa.and_(
tables.Messages.c.qid == qid,
tables.Messages.c.cid == (None),
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created))
])
claimed, free = self.driver.get(sel)
total = free + claimed
message_stats = {
'claimed': claimed,
'free': free,
'total': total,
}
try:
message_controller = self.driver.message_controller
oldest = message_controller.first(name, project, sort=1)
newest = message_controller.first(name, project, sort=-1)
except errors.QueueIsEmpty:
pass pass
else:
message_stats['oldest'] = utils.stat_message(oldest)
message_stats['newest'] = utils.stat_message(newest)
return {'messages': message_stats}

View File

@ -95,7 +95,8 @@ def load_storage_impl(uri, control_mode=False, default_store=None):
raise errors.InvalidDriver(exc) raise errors.InvalidDriver(exc)
def load_storage_driver(conf, cache, storage_type=None, control_mode=False): def load_storage_driver(conf, cache, storage_type=None,
control_mode=False, control_driver=None):
"""Loads a storage driver and returns it. """Loads a storage driver and returns it.
The driver's initializer will be passed conf and cache as The driver's initializer will be passed conf and cache as
@ -110,17 +111,24 @@ def load_storage_driver(conf, cache, storage_type=None, control_mode=False):
:param control_mode: (Default False). Determines which :param control_mode: (Default False). Determines which
driver type to load; if False, the data driver is driver type to load; if False, the data driver is
loaded. If True, the control driver is loaded. loaded. If True, the control driver is loaded.
:param control_driver: (Default None). The control driver
instance to pass to the storage driver. Needed to access
the queue controller, mainly.
""" """
mode = 'control' if control_mode else 'data' mode = 'control' if control_mode else 'data'
driver_type = 'zaqar.{0}.storage'.format(mode) driver_type = 'zaqar.{0}.storage'.format(mode)
storage_type = storage_type or conf['drivers'].storage storage_type = storage_type or conf['drivers'].storage
_invoke_args = [conf, cache]
if control_driver is not None:
_invoke_args.append(control_driver)
try: try:
mgr = driver.DriverManager(driver_type, mgr = driver.DriverManager(driver_type,
storage_type, storage_type,
invoke_on_load=True, invoke_on_load=True,
invoke_args=[conf, cache]) invoke_args=_invoke_args)
return mgr.driver return mgr.driver
@ -178,7 +186,11 @@ def can_connect(uri, conf=None):
# the URI field. This should be sufficient to initialize a # the URI field. This should be sufficient to initialize a
# storage driver. # storage driver.
driver = load_storage_driver(conf, None, driver = load_storage_driver(conf, None,
storage_type=storage_type) storage_type=storage_type,
control_driver=load_storage_driver
(conf, None,
storage_type=storage_type,
control_mode=True))
return driver.is_alive() return driver.is_alive()
except Exception as exc: except Exception as exc:
LOG.debug('Can\'t connect to: %s \n%s' % (uri, exc)) LOG.debug('Can\'t connect to: %s \n%s' % (uri, exc))

View File

@ -17,8 +17,8 @@ from zaqar import storage
class DataDriver(storage.DataDriverBase): class DataDriver(storage.DataDriverBase):
def __init__(self, conf, cache): def __init__(self, conf, cache, control_driver):
super(DataDriver, self).__init__(conf, cache) super(DataDriver, self).__init__(conf, cache, control_driver)
@property @property
def default_options(self): def default_options(self):
@ -36,7 +36,7 @@ class DataDriver(storage.DataDriverBase):
@property @property
def queue_controller(self): def queue_controller(self):
return QueueController(self) return self.control_driver.queue_controller
@property @property
def message_controller(self): def message_controller(self):
@ -56,6 +56,10 @@ class ControlDriver(storage.ControlDriverBase):
def __init__(self, conf, cache): def __init__(self, conf, cache):
super(ControlDriver, self).__init__(conf, cache) super(ControlDriver, self).__init__(conf, cache)
@property
def queue_controller(self):
return QueueController(self)
@property @property
def catalogue_controller(self): def catalogue_controller(self):
return None return None

View File

@ -28,6 +28,7 @@ from testtools import matchers
from zaqar.openstack.common.cache import cache as oslo_cache from zaqar.openstack.common.cache import cache as oslo_cache
from zaqar import storage from zaqar import storage
from zaqar.storage import errors from zaqar.storage import errors
from zaqar.storage import pipeline
from zaqar import tests as testing from zaqar import tests as testing
from zaqar.tests import helpers from zaqar.tests import helpers
@ -58,18 +59,21 @@ class ControllerBaseTest(testing.TestBase):
self.skipTest("Pooling is enabled, " self.skipTest("Pooling is enabled, "
"but control driver class is not specified") "but control driver class is not specified")
self.control = self.control_driver_class(self.conf, cache)
if not pooling: if not pooling:
self.driver = self.driver_class(self.conf, cache) args = [self.conf, cache]
if issubclass(self.driver_class, storage.DataDriverBase):
args.append(self.control)
self.driver = self.driver_class(*args)
else: else:
control = self.control_driver_class(self.conf, cache)
uri = "mongodb://localhost:27017" uri = "mongodb://localhost:27017"
for i in range(4): for i in range(4):
options = {'database': "zaqar_test_pools_" + str(i)} options = {'database': "zaqar_test_pools_" + str(i)}
control.pools_controller.create(six.text_type(i), self.control.pools_controller.create(six.text_type(i),
100, uri, options=options) 100, uri, options=options)
self.driver = self.driver_class(self.conf, cache, control) self.driver = self.driver_class(self.conf, cache, self.control)
self.addCleanup(control.pools_controller.drop_all) self.addCleanup(self.control.pools_controller.drop_all)
self.addCleanup(control.catalogue_controller.drop_all) self.addCleanup(self.control.catalogue_controller.drop_all)
self._prepare_conf() self._prepare_conf()
@ -80,6 +84,10 @@ class ControllerBaseTest(testing.TestBase):
else: else:
self.controller = self.controller_class(self.driver._pool_catalog) self.controller = self.controller_class(self.driver._pool_catalog)
self.pipeline = pipeline.DataDriver(self.conf,
self.driver,
self.control)
def _prepare_conf(self): def _prepare_conf(self):
"""Prepare the conf before running tests """Prepare the conf before running tests
@ -98,9 +106,7 @@ class QueueControllerTest(ControllerBaseTest):
def setUp(self): def setUp(self):
super(QueueControllerTest, self).setUp() super(QueueControllerTest, self).setUp()
self.queue_controller = self.driver.queue_controller self.queue_controller = self.pipeline.queue_controller
self.message_controller = self.driver.message_controller
self.claim_controller = self.driver.claim_controller
@ddt.data(None, ControllerBaseTest.project) @ddt.data(None, ControllerBaseTest.project)
def test_list(self, project): def test_list(self, project):
@ -164,10 +170,144 @@ class QueueControllerTest(ControllerBaseTest):
metadata = self.controller.get('test', project=self.project) metadata = self.controller.get('test', project=self.project)
self.assertEqual(metadata['meta'], 'test_meta') self.assertEqual(metadata['meta'], 'test_meta')
# Test queue deletion
self.controller.delete('test', project=self.project)
# Test queue existence
self.assertFalse(self.controller.exists('test', project=self.project))
class MessageControllerTest(ControllerBaseTest):
"""Message Controller base tests.
NOTE(flaper87): Implementations of this class should
override the tearDown method in order
to clean up storage's state.
"""
queue_name = 'test_queue'
controller_base_class = storage.Message
# Specifies how often expired messages are purged, in sec.
gc_interval = 0
def setUp(self):
super(MessageControllerTest, self).setUp()
# Lets create a queue
self.queue_controller = self.pipeline.queue_controller
self.claim_controller = self.pipeline.claim_controller
self.queue_controller.create(self.queue_name, project=self.project)
def tearDown(self):
self.queue_controller.delete(self.queue_name, project=self.project)
super(MessageControllerTest, self).tearDown()
def test_stats_for_empty_queue(self):
self.addCleanup(self.queue_controller.delete, 'test',
project=self.project)
created = self.queue_controller.create('test', project=self.project)
self.assertTrue(created)
stats = self.queue_controller.stats('test', project=self.project)
message_stats = stats['messages']
self.assertEqual(message_stats['free'], 0)
self.assertEqual(message_stats['claimed'], 0)
self.assertEqual(message_stats['total'], 0)
self.assertNotIn('newest', message_stats)
self.assertNotIn('oldest', message_stats)
def test_queue_count_on_bulk_delete(self):
self.addCleanup(self.queue_controller.delete, 'test-queue',
project=self.project)
queue_name = 'test-queue'
client_uuid = uuid.uuid4() client_uuid = uuid.uuid4()
created = self.queue_controller.create(queue_name,
project=self.project)
self.assertTrue(created)
# Create 10 messages.
msg_keys = _insert_fixtures(self.controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=10)
stats = self.queue_controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 10)
# Delete 5 messages
self.controller.bulk_delete(queue_name, msg_keys[0:5],
self.project)
stats = self.queue_controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 5)
def test_queue_count_on_bulk_delete_with_invalid_id(self):
self.addCleanup(self.queue_controller.delete, 'test-queue',
project=self.project)
queue_name = 'test-queue'
client_uuid = uuid.uuid4()
created = self.queue_controller.create(queue_name,
project=self.project)
self.assertTrue(created)
# Create 10 messages.
msg_keys = _insert_fixtures(self.controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=10)
stats = self.queue_controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 10)
# Delete 5 messages
self.controller.bulk_delete(queue_name,
msg_keys[0:5] + ['invalid'],
self.project)
stats = self.queue_controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 5)
def test_queue_count_on_delete(self):
self.addCleanup(self.queue_controller.delete, 'test-queue',
project=self.project)
queue_name = 'test-queue'
client_uuid = uuid.uuid4()
created = self.queue_controller.create(queue_name,
project=self.project)
self.assertTrue(created)
# Create 10 messages.
msg_keys = _insert_fixtures(self.controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=10)
stats = self.queue_controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 10)
# Delete 1 message
self.controller.delete(queue_name, msg_keys[0], self.project)
stats = self.queue_controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 9)
def test_queue_stats(self):
# Test queue creation
self.addCleanup(self.queue_controller.delete, 'test',
project=self.project)
created = self.queue_controller.create('test',
metadata=dict(meta='test_meta'),
project=self.project)
client_uuid = uuid.uuid4()
# Test queue statistic # Test queue statistic
_insert_fixtures(self.message_controller, 'test', _insert_fixtures(self.controller, 'test',
project=self.project, client_uuid=client_uuid, project=self.project, client_uuid=client_uuid,
num=6) num=6)
@ -176,11 +316,11 @@ class QueueControllerTest(ControllerBaseTest):
# message timestamps (and may not be monkey-patchable). # message timestamps (and may not be monkey-patchable).
time.sleep(1.2) time.sleep(1.2)
_insert_fixtures(self.message_controller, 'test', _insert_fixtures(self.controller, 'test',
project=self.project, client_uuid=client_uuid, project=self.project, client_uuid=client_uuid,
num=6) num=6)
stats = self.controller.stats('test', project=self.project) stats = self.queue_controller.stats('test', project=self.project)
message_stats = stats['messages'] message_stats = stats['messages']
self.assertEqual(message_stats['free'], 12) self.assertEqual(message_stats['free'], 12)
@ -210,120 +350,22 @@ class QueueControllerTest(ControllerBaseTest):
self.assertThat(oldest['created'], self.assertThat(oldest['created'],
matchers.LessThan(newest['created'])) matchers.LessThan(newest['created']))
# Test queue deletion
self.controller.delete('test', project=self.project)
# Test queue existence
self.assertFalse(self.controller.exists('test', project=self.project))
def test_stats_for_empty_queue(self):
self.addCleanup(self.controller.delete, 'test', project=self.project)
created = self.controller.create('test', project=self.project)
self.assertTrue(created)
stats = self.controller.stats('test', project=self.project)
message_stats = stats['messages']
self.assertEqual(message_stats['free'], 0)
self.assertEqual(message_stats['claimed'], 0)
self.assertEqual(message_stats['total'], 0)
self.assertNotIn('newest', message_stats)
self.assertNotIn('oldest', message_stats)
def test_queue_count_on_bulk_delete(self):
self.addCleanup(self.controller.delete, 'test-queue',
project=self.project)
queue_name = 'test-queue'
client_uuid = uuid.uuid4()
created = self.controller.create(queue_name, project=self.project)
self.assertTrue(created)
# Create 10 messages.
msg_keys = _insert_fixtures(self.message_controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=10)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 10)
# Delete 5 messages
self.message_controller.bulk_delete(queue_name, msg_keys[0:5],
self.project)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 5)
def test_queue_count_on_bulk_delete_with_invalid_id(self):
self.addCleanup(self.controller.delete, 'test-queue',
project=self.project)
queue_name = 'test-queue'
client_uuid = uuid.uuid4()
created = self.controller.create(queue_name, project=self.project)
self.assertTrue(created)
# Create 10 messages.
msg_keys = _insert_fixtures(self.message_controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=10)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 10)
# Delete 5 messages
self.message_controller.bulk_delete(queue_name,
msg_keys[0:5] + ['invalid'],
self.project)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 5)
def test_queue_count_on_delete(self):
self.addCleanup(self.controller.delete, 'test-queue',
project=self.project)
queue_name = 'test-queue'
client_uuid = uuid.uuid4()
created = self.controller.create(queue_name, project=self.project)
self.assertTrue(created)
# Create 10 messages.
msg_keys = _insert_fixtures(self.message_controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=10)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 10)
# Delete 1 message
self.message_controller.delete(queue_name, msg_keys[0],
self.project)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 9)
def test_queue_count_on_claim_delete(self): def test_queue_count_on_claim_delete(self):
self.addCleanup(self.controller.delete, 'test-queue', self.addCleanup(self.queue_controller.delete, 'test-queue',
project=self.project) project=self.project)
queue_name = 'test-queue' queue_name = 'test-queue'
client_uuid = uuid.uuid4() client_uuid = uuid.uuid4()
created = self.controller.create(queue_name, project=self.project) created = self.queue_controller.create(queue_name,
project=self.project)
self.assertTrue(created) self.assertTrue(created)
# Create 15 messages. # Create 15 messages.
msg_keys = _insert_fixtures(self.message_controller, queue_name, msg_keys = _insert_fixtures(self.controller, queue_name,
project=self.project, project=self.project,
client_uuid=client_uuid, num=15) client_uuid=client_uuid, num=15)
stats = self.controller.stats(queue_name, stats = self.queue_controller.stats(queue_name,
self.project)['messages'] self.project)['messages']
self.assertEqual(stats['total'], 15) self.assertEqual(stats['total'], 15)
@ -332,24 +374,24 @@ class QueueControllerTest(ControllerBaseTest):
claim_id, _ = self.claim_controller.create(queue_name, metadata, claim_id, _ = self.claim_controller.create(queue_name, metadata,
self.project) self.project)
stats = self.controller.stats(queue_name, stats = self.queue_controller.stats(queue_name,
self.project)['messages'] self.project)['messages']
self.assertEqual(stats['claimed'], 10) self.assertEqual(stats['claimed'], 10)
# Delete one message and ensure stats are updated even # Delete one message and ensure stats are updated even
# thought the claim itself has not been deleted. # thought the claim itself has not been deleted.
self.message_controller.delete(queue_name, msg_keys[0], self.controller.delete(queue_name, msg_keys[0],
self.project, claim_id) self.project, claim_id)
stats = self.controller.stats(queue_name, stats = self.queue_controller.stats(queue_name,
self.project)['messages'] self.project)['messages']
self.assertEqual(stats['total'], 14) self.assertEqual(stats['total'], 14)
self.assertEqual(stats['claimed'], 9) self.assertEqual(stats['claimed'], 9)
self.assertEqual(stats['free'], 5) self.assertEqual(stats['free'], 5)
# Same thing but use bulk_delete interface # Same thing but use bulk_delete interface
self.message_controller.bulk_delete(queue_name, msg_keys[1:3], self.controller.bulk_delete(queue_name, msg_keys[1:3],
self.project) self.project)
stats = self.controller.stats(queue_name, stats = self.queue_controller.stats(queue_name,
self.project)['messages'] self.project)['messages']
self.assertEqual(stats['total'], 12) self.assertEqual(stats['total'], 12)
self.assertEqual(stats['claimed'], 7) self.assertEqual(stats['claimed'], 7)
@ -358,37 +400,11 @@ class QueueControllerTest(ControllerBaseTest):
# Delete the claim # Delete the claim
self.claim_controller.delete(queue_name, claim_id, self.claim_controller.delete(queue_name, claim_id,
self.project) self.project)
stats = self.controller.stats(queue_name, stats = self.queue_controller.stats(queue_name,
self.project)['messages'] self.project)['messages']
self.assertEqual(stats['claimed'], 0) self.assertEqual(stats['claimed'], 0)
class MessageControllerTest(ControllerBaseTest):
"""Message Controller base tests.
NOTE(flaper87): Implementations of this class should
override the tearDown method in order
to clean up storage's state.
"""
queue_name = 'test_queue'
controller_base_class = storage.Message
# Specifies how often expired messages are purged, in sec.
gc_interval = 0
def setUp(self):
super(MessageControllerTest, self).setUp()
# Lets create a queue
self.queue_controller = self.driver.queue_controller
self.claim_controller = self.driver.claim_controller
self.queue_controller.create(self.queue_name, project=self.project)
def tearDown(self):
self.queue_controller.delete(self.queue_name, project=self.project)
super(MessageControllerTest, self).tearDown()
def test_message_lifecycle(self): def test_message_lifecycle(self):
queue_name = self.queue_name queue_name = self.queue_name
@ -729,8 +745,8 @@ class ClaimControllerTest(ControllerBaseTest):
super(ClaimControllerTest, self).setUp() super(ClaimControllerTest, self).setUp()
# Lets create a queue # Lets create a queue
self.queue_controller = self.driver.queue_controller self.queue_controller = self.pipeline.queue_controller
self.message_controller = self.driver.message_controller self.message_controller = self.pipeline.message_controller
self.queue_controller.create(self.queue_name, project=self.project) self.queue_controller.create(self.queue_name, project=self.project)
def tearDown(self): def tearDown(self):

View File

@ -579,7 +579,7 @@ class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest):
storage = self.boot.storage._storage storage = self.boot.storage._storage
connection = storage.connection connection = storage.connection
connection.drop_database(storage.queues_database) connection.drop_database(self.boot.control.queues_database)
for db in storage.message_databases: for db in storage.message_databases:
connection.drop_database(db) connection.drop_database(db)

View File

@ -233,9 +233,10 @@ class TestClaimsMongoDB(ClaimsBaseTest):
def tearDown(self): def tearDown(self):
storage = self.boot.storage._storage storage = self.boot.storage._storage
control = self.boot.control
connection = storage.connection connection = storage.connection
connection.drop_database(storage.queues_database) connection.drop_database(control.queues_database)
for db in storage.message_databases: for db in storage.message_databases:
connection.drop_database(db) connection.drop_database(db)

View File

@ -348,7 +348,7 @@ class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest):
storage = self.boot.storage._storage storage = self.boot.storage._storage
connection = storage.connection connection = storage.connection
connection.drop_database(storage.queues_database) connection.drop_database(self.boot.control.queues_database)
for db in storage.message_databases: for db in storage.message_databases:
connection.drop_database(db) connection.drop_database(db)

View File

@ -286,9 +286,10 @@ class TestClaimsMongoDB(ClaimsBaseTest):
def tearDown(self): def tearDown(self):
storage = self.boot.storage._storage storage = self.boot.storage._storage
control = self.boot.control
connection = storage.connection connection = storage.connection
connection.drop_database(storage.queues_database) connection.drop_database(control.queues_database)
for db in storage.message_databases: for db in storage.message_databases:
connection.drop_database(db) connection.drop_database(db)

View File

@ -329,7 +329,7 @@ class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest):
storage = self.boot.storage._storage storage = self.boot.storage._storage
connection = storage.connection connection = storage.connection
connection.drop_database(storage.queues_database) connection.drop_database(self.boot.control.queues_database)
for db in storage.message_databases: for db in storage.message_databases:
connection.drop_database(db) connection.drop_database(db)

View File

@ -286,9 +286,10 @@ class TestClaimsMongoDB(ClaimsBaseTest):
def tearDown(self): def tearDown(self):
storage = self.boot.storage._storage storage = self.boot.storage._storage
control = self.boot.control
connection = storage.connection connection = storage.connection
connection.drop_database(storage.queues_database) connection.drop_database(control.queues_database)
for db in storage.message_databases: for db in storage.message_databases:
connection.drop_database(db) connection.drop_database(db)

View File

@ -326,10 +326,11 @@ class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest):
super(TestQueueLifecycleMongoDB, self).setUp() super(TestQueueLifecycleMongoDB, self).setUp()
def tearDown(self): def tearDown(self):
control = self.boot.control
storage = self.boot.storage._storage storage = self.boot.storage._storage
connection = storage.connection connection = storage.connection
connection.drop_database(storage.queues_database) connection.drop_database(control.queues_database)
for db in storage.message_databases: for db in storage.message_databases:
connection.drop_database(db) connection.drop_database(db)