280da9d054
Whenever an admin would attempt to register a new shard, the shard was admitted whether or not the shard URI actually existed, and whether or not marconi could handle shards of that type. This patch makes it so that a DataDriver instance is temporarily instantiated prior to storing the shard entry. This instance uses is_alive() to determine whether or not the shard can be reached/is supported. This applies to both PUT/PATCH. HTTP 400 is returned if the registration fails, as a result. To enable this change, the ability to create a dynamic configuration object was extracted from sharding.Catalog. Tests were written for this new function. The unit tests were updated accordingly, expanding the coverage of test_shards to include the new failure cases. Some changes needed to be made to handle new oslo.cache interface: - cache.unset doesn't exist - now using cache.unset_many - cache.get_cache() takes a URI, not a ConfigOpts Change-Id: I40509f525466da01baad2a5aef81c7b99c8d2f97 Closes-Bug: #1258591 Closes-Bug: #1273376 Closes-Bug: #1273377
367 lines
13 KiB
Python
367 lines
13 KiB
Python
# Copyright (c) 2013 Red Hat, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import datetime
|
|
import time
|
|
import uuid
|
|
|
|
import mock
|
|
from pymongo import cursor
|
|
import pymongo.errors
|
|
from testtools import matchers
|
|
|
|
from marconi.openstack.common.cache import cache as oslo_cache
|
|
from marconi.openstack.common import timeutils
|
|
from marconi.queues import storage
|
|
from marconi.queues.storage import errors
|
|
from marconi.queues.storage import mongodb
|
|
from marconi.queues.storage.mongodb import controllers
|
|
from marconi.queues.storage.mongodb import options
|
|
from marconi.queues.storage.mongodb import utils
|
|
from marconi import tests as testing
|
|
from marconi.tests.queues.storage import base
|
|
|
|
|
|
def _cleanup_databases(controller):
|
|
databases = (controller.driver.message_databases +
|
|
[controller.driver.queues_database])
|
|
|
|
for db in databases:
|
|
controller.driver.connection.drop_database(db)
|
|
|
|
|
|
class MongodbUtilsTest(testing.TestBase):
|
|
|
|
def test_scope_queue_name(self):
|
|
self.assertEqual(utils.scope_queue_name('my-q'), '/my-q')
|
|
self.assertEqual(utils.scope_queue_name('my-q', None), '/my-q')
|
|
self.assertEqual(utils.scope_queue_name('my-q', '123'), '123/my-q')
|
|
|
|
self.assertEqual(utils.scope_queue_name(None), '/')
|
|
self.assertEqual(utils.scope_queue_name(None, '123'), '123/')
|
|
|
|
def test_descope_queue_name(self):
|
|
self.assertEqual(utils.descope_queue_name('/'), None)
|
|
self.assertEqual(utils.descope_queue_name('/some-pig'), 'some-pig')
|
|
self.assertEqual(utils.descope_queue_name('radiant/some-pig'),
|
|
'some-pig')
|
|
|
|
def test_calculate_backoff(self):
|
|
sec = utils.calculate_backoff(0, 10, 2, 0)
|
|
self.assertEqual(sec, 0)
|
|
|
|
sec = utils.calculate_backoff(9, 10, 2, 0)
|
|
self.assertEqual(sec, 1.8)
|
|
|
|
sec = utils.calculate_backoff(4, 10, 2, 0)
|
|
self.assertEqual(sec, 0.8)
|
|
|
|
sec = utils.calculate_backoff(4, 10, 2, 1)
|
|
if sec != 0.8:
|
|
self.assertThat(sec, matchers.GreaterThan(0.8))
|
|
self.assertThat(sec, matchers.LessThan(1.8))
|
|
|
|
self.assertRaises(ValueError, utils.calculate_backoff, 0, 10, -2, -1)
|
|
self.assertRaises(ValueError, utils.calculate_backoff, 0, 10, -2, 0)
|
|
self.assertRaises(ValueError, utils.calculate_backoff, 0, 10, 2, -1)
|
|
|
|
self.assertRaises(ValueError, utils.calculate_backoff, -2, -10, 2, 0)
|
|
self.assertRaises(ValueError, utils.calculate_backoff, 2, -10, 2, 0)
|
|
self.assertRaises(ValueError, utils.calculate_backoff, -2, 10, 2, 0)
|
|
self.assertRaises(ValueError, utils.calculate_backoff, -1, 10, 2, 0)
|
|
self.assertRaises(ValueError, utils.calculate_backoff, 10, 10, 2, 0)
|
|
self.assertRaises(ValueError, utils.calculate_backoff, 11, 10, 2, 0)
|
|
|
|
|
|
@testing.requires_mongodb
|
|
class MongodbDriverTest(testing.TestBase):
|
|
|
|
config_file = 'wsgi_mongodb.conf'
|
|
|
|
def _purge_databases(self):
|
|
_cleanup_databases(self)
|
|
|
|
def test_db_instance(self):
|
|
cache = oslo_cache.get_cache()
|
|
driver = mongodb.DataDriver(self.conf, cache)
|
|
|
|
databases = (driver.message_databases +
|
|
[driver.queues_database])
|
|
|
|
for db in databases:
|
|
self.assertThat(db.name, matchers.StartsWith(
|
|
driver.mongodb_conf.database))
|
|
|
|
|
|
@testing.requires_mongodb
|
|
class MongodbQueueTests(base.QueueControllerTest):
|
|
|
|
driver_class = mongodb.DataDriver
|
|
config_file = 'wsgi_mongodb.conf'
|
|
controller_class = controllers.QueueController
|
|
|
|
def _purge_databases(self):
|
|
_cleanup_databases(self)
|
|
|
|
def _prepare_conf(self):
|
|
self.config(options.MONGODB_GROUP,
|
|
database=uuid.uuid4().hex)
|
|
|
|
def test_indexes(self):
|
|
collection = self.controller._collection
|
|
indexes = collection.index_information()
|
|
self.assertIn('p_q_1', indexes)
|
|
|
|
def test_messages_purged(self):
|
|
queue_name = 'test'
|
|
self.controller.create(queue_name)
|
|
self.message_controller.post(queue_name,
|
|
[{'ttl': 60}],
|
|
1234)
|
|
self.controller.delete(queue_name)
|
|
for collection in self.message_controller._collections:
|
|
self.assertEqual(collection.find({'q': queue_name}).count(), 0)
|
|
|
|
def test_raises_connection_error(self):
|
|
|
|
with mock.patch.object(cursor.Cursor, 'next', autospec=True) as method:
|
|
error = pymongo.errors.ConnectionFailure()
|
|
method.side_effect = error
|
|
|
|
queues = next(self.controller.list())
|
|
self.assertRaises(storage.errors.ConnectionError,
|
|
queues.next)
|
|
|
|
|
|
@testing.requires_mongodb
|
|
class MongodbMessageTests(base.MessageControllerTest):
|
|
|
|
driver_class = mongodb.DataDriver
|
|
config_file = 'wsgi_mongodb.conf'
|
|
controller_class = controllers.MessageController
|
|
|
|
# NOTE(kgriffs): MongoDB's TTL scavenger only runs once a minute
|
|
gc_interval = 60
|
|
|
|
def _purge_databases(self):
|
|
_cleanup_databases(self)
|
|
|
|
def _prepare_conf(self):
|
|
self.config(options.MONGODB_GROUP,
|
|
database=uuid.uuid4().hex)
|
|
|
|
def test_indexes(self):
|
|
for collection in self.controller._collections:
|
|
indexes = collection.index_information()
|
|
self.assertIn('active', indexes)
|
|
self.assertIn('claimed', indexes)
|
|
self.assertIn('queue_marker', indexes)
|
|
self.assertIn('counting', indexes)
|
|
|
|
def test_message_counter(self):
|
|
queue_name = 'marker_test'
|
|
iterations = 10
|
|
|
|
self.queue_controller.create(queue_name)
|
|
|
|
seed_marker1 = self.queue_controller._get_counter(queue_name)
|
|
self.assertEqual(seed_marker1, 1, 'First marker is 1')
|
|
|
|
for i in range(iterations):
|
|
self.controller.post(queue_name, [{'ttl': 60}], 'uuid')
|
|
|
|
marker1 = self.queue_controller._get_counter(queue_name)
|
|
marker2 = self.queue_controller._get_counter(queue_name)
|
|
marker3 = self.queue_controller._get_counter(queue_name)
|
|
|
|
self.assertEqual(marker1, marker2)
|
|
self.assertEqual(marker2, marker3)
|
|
self.assertEqual(marker1, i + 2)
|
|
|
|
new_value = self.queue_controller._inc_counter(queue_name)
|
|
self.assertIsNotNone(new_value)
|
|
|
|
value_before = self.queue_controller._get_counter(queue_name)
|
|
new_value = self.queue_controller._inc_counter(queue_name)
|
|
self.assertIsNotNone(new_value)
|
|
value_after = self.queue_controller._get_counter(queue_name)
|
|
self.assertEqual(value_after, value_before + 1)
|
|
|
|
value_before = value_after
|
|
new_value = self.queue_controller._inc_counter(queue_name, amount=7)
|
|
value_after = self.queue_controller._get_counter(queue_name)
|
|
self.assertEqual(value_after, value_before + 7)
|
|
self.assertEqual(value_after, new_value)
|
|
|
|
reference_value = value_after
|
|
|
|
unchanged = self.queue_controller._inc_counter(queue_name, window=10)
|
|
self.assertIsNone(unchanged)
|
|
|
|
now = timeutils.utcnow() + datetime.timedelta(seconds=10)
|
|
timeutils_utcnow = 'marconi.openstack.common.timeutils.utcnow'
|
|
with mock.patch(timeutils_utcnow) as mock_utcnow:
|
|
mock_utcnow.return_value = now
|
|
changed = self.queue_controller._inc_counter(queue_name, window=5)
|
|
self.assertEqual(changed, reference_value + 1)
|
|
|
|
def test_race_condition_on_post(self):
|
|
queue_name = 'marker_test'
|
|
self.queue_controller.create(queue_name)
|
|
|
|
expected_messages = [
|
|
{
|
|
'ttl': 60,
|
|
'body': {
|
|
'event': 'BackupStarted',
|
|
'backupId': 'c378813c-3f0b-11e2-ad92-7823d2b0f3ce',
|
|
},
|
|
},
|
|
{
|
|
'ttl': 60,
|
|
'body': {
|
|
'event': 'BackupStarted',
|
|
'backupId': 'd378813c-3f0b-11e2-ad92-7823d2b0f3ce',
|
|
},
|
|
},
|
|
{
|
|
'ttl': 60,
|
|
'body': {
|
|
'event': 'BackupStarted',
|
|
'backupId': 'e378813c-3f0b-11e2-ad92-7823d2b0f3ce',
|
|
},
|
|
},
|
|
]
|
|
|
|
uuid = '97b64000-2526-11e3-b088-d85c1300734c'
|
|
|
|
# NOTE(kgriffs): Patch _inc_counter so it is a noop, so that
|
|
# the second time we post, we will get a collision. This simulates
|
|
# what happens when we have parallel requests and the "winning"
|
|
# requests hasn't gotten around to calling _inc_counter before the
|
|
# "losing" request attempts to insert it's batch of messages.
|
|
with mock.patch.object(mongodb.queues.QueueController,
|
|
'_inc_counter', autospec=True) as method:
|
|
|
|
method.return_value = 2
|
|
messages = expected_messages[:1]
|
|
created = list(self.controller.post(queue_name, messages, uuid))
|
|
self.assertEqual(len(created), 1)
|
|
|
|
# Force infinite retries
|
|
if testing.RUN_SLOW_TESTS:
|
|
method.return_value = None
|
|
|
|
with testing.expect(errors.MessageConflict):
|
|
self.controller.post(queue_name, messages, uuid)
|
|
|
|
created = list(self.controller.post(queue_name,
|
|
expected_messages[1:],
|
|
uuid))
|
|
|
|
self.assertEqual(len(created), 2)
|
|
|
|
expected_ids = [m['body']['backupId'] for m in expected_messages]
|
|
|
|
interaction = self.controller.list(queue_name, client_uuid=uuid,
|
|
echo=True)
|
|
|
|
actual_messages = list(next(interaction))
|
|
self.assertEqual(len(actual_messages), len(expected_messages))
|
|
actual_ids = [m['body']['backupId'] for m in actual_messages]
|
|
|
|
self.assertEqual(actual_ids, expected_ids)
|
|
|
|
def test_empty_queue_exception(self):
|
|
queue_name = 'empty-queue-test'
|
|
self.queue_controller.create(queue_name)
|
|
|
|
self.assertRaises(storage.errors.QueueIsEmpty,
|
|
self.controller.first, queue_name)
|
|
|
|
def test_invalid_sort_option(self):
|
|
queue_name = 'empty-queue-test'
|
|
self.queue_controller.create(queue_name)
|
|
|
|
self.assertRaises(ValueError,
|
|
self.controller.first, queue_name, sort=0)
|
|
|
|
|
|
@testing.requires_mongodb
|
|
class MongodbClaimTests(base.ClaimControllerTest):
|
|
|
|
driver_class = mongodb.DataDriver
|
|
config_file = 'wsgi_mongodb.conf'
|
|
controller_class = controllers.ClaimController
|
|
|
|
def _purge_databases(self):
|
|
_cleanup_databases(self)
|
|
|
|
def _prepare_conf(self):
|
|
self.config(options.MONGODB_GROUP,
|
|
database=uuid.uuid4().hex)
|
|
|
|
def test_claim_doesnt_exist(self):
|
|
"""Verifies that operations fail on expired/missing claims.
|
|
|
|
Methods should raise an exception when the claim doesn't
|
|
exists and/or has expired.
|
|
"""
|
|
epoch = '000000000000000000000000'
|
|
self.assertRaises(storage.errors.ClaimDoesNotExist,
|
|
self.controller.get, self.queue_name,
|
|
epoch, project=self.project)
|
|
|
|
claim_id, messages = self.controller.create(self.queue_name,
|
|
{'ttl': 1, 'grace': 0},
|
|
project=self.project)
|
|
|
|
# Lets let it expire
|
|
time.sleep(1)
|
|
self.assertRaises(storage.errors.ClaimDoesNotExist,
|
|
self.controller.update, self.queue_name,
|
|
claim_id, {}, project=self.project)
|
|
|
|
self.assertRaises(storage.errors.ClaimDoesNotExist,
|
|
self.controller.update, self.queue_name,
|
|
claim_id, {}, project=self.project)
|
|
|
|
|
|
@testing.requires_mongodb
|
|
class MongodbShardsTests(base.ShardsControllerTest):
|
|
driver_class = mongodb.ControlDriver
|
|
controller_class = controllers.ShardsController
|
|
|
|
def setUp(self):
|
|
super(MongodbShardsTests, self).setUp()
|
|
self.load_conf('wsgi_mongodb.conf')
|
|
|
|
def tearDown(self):
|
|
super(MongodbShardsTests, self).tearDown()
|
|
|
|
|
|
@testing.requires_mongodb
|
|
class MongodbCatalogueTests(base.CatalogueControllerTest):
|
|
driver_class = mongodb.ControlDriver
|
|
controller_class = controllers.CatalogueController
|
|
|
|
def setUp(self):
|
|
super(MongodbCatalogueTests, self).setUp()
|
|
self.load_conf('wsgi_mongodb.conf')
|
|
|
|
def tearDown(self):
|
|
self.controller.drop_all()
|
|
super(MongodbCatalogueTests, self).tearDown()
|