From a1163331fcb3253ae6af5e4cb8c528eb3679baf1 Mon Sep 17 00:00:00 2001 From: Flavio Percoco Date: Wed, 24 Sep 2014 16:51:38 +0200 Subject: [PATCH] Add first reliability enforcement This patch adds an reliability enforcement for mongodb's driver. It forces deployers to use replicasets or mongos as a mongodb cluster for Zaqar. In addition to that, it forces deployers to provide a write concern > 2 and/or majority. If none of this are met, the driver will raise a RuntimeException and fail to start. If no write concern is provided, majority will be used. Change-Id: Ie74a4b441654243b3ed7e7fd6c40863969cd446d Closes-bug: #1372335 --- tests/etc/wsgi_mongodb.conf | 3 +- tests/etc/wsgi_mongodb_pooled.conf | 1 + tests/unit/common/storage/test_utils.py | 16 +++++- .../unit/queues/storage/test_impl_mongodb.py | 57 +++++++++++++++++++ tests/unit/queues/storage/test_pool_queues.py | 12 ++-- zaqar/queues/bootstrap.py | 12 ++++ zaqar/queues/storage/mongodb/driver.py | 27 ++++++++- zaqar/queues/storage/pooling.py | 3 +- zaqar/queues/storage/utils.py | 3 +- zaqar/queues/transport/wsgi/v1_0/pools.py | 7 ++- zaqar/queues/transport/wsgi/v1_1/pools.py | 7 ++- zaqar/tests/base.py | 4 ++ zaqar/tests/queues/storage/base.py | 2 - zaqar/tests/queues/transport/wsgi/base.py | 2 + 14 files changed, 138 insertions(+), 18 deletions(-) diff --git a/tests/etc/wsgi_mongodb.conf b/tests/etc/wsgi_mongodb.conf index 8b5cdf2fc..c2da76d24 100644 --- a/tests/etc/wsgi_mongodb.conf +++ b/tests/etc/wsgi_mongodb.conf @@ -1,6 +1,7 @@ [DEFAULT] debug = False verbose = False +unreliable = True [drivers] transport = wsgi @@ -13,4 +14,4 @@ port = 8888 uri = mongodb://127.0.0.1:27017 database = zaqar_test max_reconnect_attempts = 3 -reconnect_sleep = 0.001 \ No newline at end of file +reconnect_sleep = 0.001 diff --git a/tests/etc/wsgi_mongodb_pooled.conf b/tests/etc/wsgi_mongodb_pooled.conf index f9db7a314..abe589e3f 100644 --- a/tests/etc/wsgi_mongodb_pooled.conf +++ b/tests/etc/wsgi_mongodb_pooled.conf @@ -1,6 +1,7 @@ [DEFAULT] pooling = True admin_mode = True +unreliable = True [drivers] transport = wsgi diff --git a/tests/unit/common/storage/test_utils.py b/tests/unit/common/storage/test_utils.py index 4b70d26b0..fa1bab7c2 100644 --- a/tests/unit/common/storage/test_utils.py +++ b/tests/unit/common/storage/test_utils.py @@ -15,6 +15,7 @@ import ddt +from zaqar.queues import bootstrap from zaqar.queues.storage import utils from zaqar import tests as testing @@ -22,9 +23,15 @@ from zaqar import tests as testing @ddt.ddt class TestUtils(testing.TestBase): + def setUp(self): + super(TestUtils, self).setUp() + self.conf.register_opts(bootstrap._GENERAL_OPTIONS) + @testing.requires_mongodb def test_can_connect_suceeds_if_good_uri_mongo(self): - self.assertTrue(utils.can_connect('mongodb://localhost:27017')) + self.config(unreliable=True) + self.assertTrue(utils.can_connect('mongodb://localhost:27017', + conf=self.conf)) @testing.requires_redis def test_can_connect_suceeds_if_good_uri_redis(self): @@ -39,8 +46,11 @@ class TestUtils(testing.TestBase): @testing.requires_mongodb def test_can_connect_fails_if_bad_uri_mongodb(self): - self.assertFalse(utils.can_connect('mongodb://localhost:8080')) - self.assertFalse(utils.can_connect('mongodb://example.com:27017')) + self.config(unreliable=True) + self.assertFalse(utils.can_connect('mongodb://localhost:8080', + conf=self.conf)) + self.assertFalse(utils.can_connect('mongodb://example.com:27017', + conf=self.conf)) @testing.requires_redis def test_can_connect_fails_if_bad_uri_redis(self): diff --git a/tests/unit/queues/storage/test_impl_mongodb.py b/tests/unit/queues/storage/test_impl_mongodb.py index 33752c70f..a2ddcc88d 100644 --- a/tests/unit/queues/storage/test_impl_mongodb.py +++ b/tests/unit/queues/storage/test_impl_mongodb.py @@ -26,6 +26,7 @@ import six from testtools import matchers from zaqar.openstack.common.cache import cache as oslo_cache +from zaqar.queues import bootstrap from zaqar.queues import storage from zaqar.queues.storage import errors from zaqar.queues.storage import mongodb @@ -140,7 +141,14 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase): config_file = 'wsgi_mongodb.conf' + def setUp(self): + super(MongodbDriverTest, self).setUp() + + self.conf.register_opts(bootstrap._GENERAL_OPTIONS) + self.config(unreliable=False) + def test_db_instance(self): + self.config(unreliable=True) cache = oslo_cache.get_cache() driver = mongodb.DataDriver(self.conf, cache) @@ -152,6 +160,7 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase): driver.mongodb_conf.database)) def test_version_match(self): + self.config(unreliable=True) cache = oslo_cache.get_cache() with mock.patch('pymongo.MongoClient.server_info') as info: @@ -166,6 +175,54 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase): except RuntimeError: self.fail('version match failed') + def test_replicaset_or_mongos_needed(self): + cache = oslo_cache.get_cache() + + with mock.patch('pymongo.MongoClient.nodes') as nodes: + nodes.__get__ = mock.Mock(return_value=[]) + with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos: + is_mongos.__get__ = mock.Mock(return_value=False) + self.assertRaises(RuntimeError, mongodb.DataDriver, + self.conf, cache) + + def test_using_replset(self): + cache = oslo_cache.get_cache() + + with mock.patch('pymongo.MongoClient.nodes') as nodes: + nodes.__get__ = mock.Mock(return_value=['node1', 'node2']) + mongodb.DataDriver(self.conf, cache) + + def test_using_mongos(self): + cache = oslo_cache.get_cache() + + with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos: + is_mongos.__get__ = mock.Mock(return_value=True) + mongodb.DataDriver(self.conf, cache) + + def test_write_concern_check_works(self): + cache = oslo_cache.get_cache() + + with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos: + is_mongos.__get__ = mock.Mock(return_value=True) + + with mock.patch('pymongo.MongoClient.write_concern') as wc: + wc.__get__ = mock.Mock(return_value={'w': 1}) + self.assertRaises(RuntimeError, mongodb.DataDriver, + self.conf, cache) + + wc.__get__ = mock.Mock(return_value={'w': 2}) + mongodb.DataDriver(self.conf, cache) + + def test_write_concern_is_set(self): + cache = oslo_cache.get_cache() + + with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos: + is_mongos.__get__ = mock.Mock(return_value=True) + driver = mongodb.DataDriver(self.conf, cache) + wc = driver.connection.write_concern + self.assertEqual(wc['w'], 'majority') + self.assertEqual(wc['j'], False) + @testing.requires_mongodb class MongodbQueueTests(MongodbSetupMixin, base.QueueControllerTest): diff --git a/tests/unit/queues/storage/test_pool_queues.py b/tests/unit/queues/storage/test_pool_queues.py index 5b2b0414c..144044d31 100644 --- a/tests/unit/queues/storage/test_pool_queues.py +++ b/tests/unit/queues/storage/test_pool_queues.py @@ -27,17 +27,19 @@ from zaqar import tests as testing @testing.requires_mongodb class PoolQueuesTest(testing.TestBase): + config_file = 'wsgi_mongodb_pooled.conf' + def setUp(self): super(PoolQueuesTest, self).setUp() - conf = self.load_conf('wsgi_mongodb_pooled.conf') - conf.register_opts([cfg.StrOpt('storage')], - group='drivers') + self.conf.register_opts([cfg.StrOpt('storage')], + group='drivers') cache = oslo_cache.get_cache() - control = utils.load_storage_driver(conf, cache, control_mode=True) + control = utils.load_storage_driver(self.conf, cache, + control_mode=True) self.pools_ctrl = control.pools_controller - self.driver = pooling.DataDriver(conf, cache, control) + self.driver = pooling.DataDriver(self.conf, cache, control) self.controller = self.driver.queue_controller # fake two pools diff --git a/zaqar/queues/bootstrap.py b/zaqar/queues/bootstrap.py index d1b40b8d2..1ff535316 100644 --- a/zaqar/queues/bootstrap.py +++ b/zaqar/queues/bootstrap.py @@ -18,6 +18,7 @@ from stevedore import driver from zaqar.common import decorators from zaqar.common import errors +from zaqar.i18n import _ from zaqar.openstack.common.cache import cache as oslo_cache from zaqar.openstack.common import log from zaqar.queues.storage import pipeline @@ -49,6 +50,9 @@ _GENERAL_OPTIONS = ( 'configuration is used to determine where the ' 'catalogue/control plane data is kept.'), deprecated_opts=[cfg.DeprecatedOpt('pooling')]), + + cfg.BoolOpt('unreliable', default=None, + help=('Disable all reliability constrains.')), ) _DRIVER_OPTIONS = ( @@ -81,6 +85,14 @@ class Bootstrap(object): log.setup('zaqar') + if self.conf.unreliable is None: + msg = _(u'Unreliable\'s default value will be changed to False ' + 'in the Kilo release. Please, make sure your deployments ' + 'are working in a reliable mode or that `unreliable` is ' + 'explicitly set to `True` in your configuration files.') + LOG.warn(msg) + self.conf.unreliable = True + @decorators.lazy_property(write=False) def storage(self): LOG.debug(u'Loading storage driver') diff --git a/zaqar/queues/storage/mongodb/driver.py b/zaqar/queues/storage/mongodb/driver.py index f74b6039b..b4fecde83 100644 --- a/zaqar/queues/storage/mongodb/driver.py +++ b/zaqar/queues/storage/mongodb/driver.py @@ -72,11 +72,36 @@ class DataDriver(storage.DataDriverBase): self.mongodb_conf = self.conf[options.MONGODB_GROUP] - server_version = self.connection.server_info()['version'] + conn = self.connection + server_version = conn.server_info()['version'] + if tuple(map(int, server_version.split('.'))) < (2, 2): raise RuntimeError(_('The mongodb driver requires mongodb>=2.2, ' '%s found') % server_version) + if not len(conn.nodes) > 1 and not conn.is_mongos: + if not self.conf.unreliable: + raise RuntimeError(_('Either a replica set or a mongos is ' + 'required to guarantee message delivery')) + else: + wc = conn.write_concern.get('w') + majority = (wc == 'majority' or + wc >= 2) + + if not wc: + # NOTE(flaper87): No write concern specified, use majority + # and don't count journal as a replica. Use `update` to avoid + # overwriting `wtimeout` + conn.write_concern.update({'w': 'majority'}) + elif not self.conf.unreliable and not majority: + raise RuntimeError(_('Using a write concern other than ' + '`majority` or > 2 make sthe service ' + 'unreliable. Please use a different ' + 'write concern or set `unreliable` ' + 'to True in the config file.')) + + conn.write_concern['j'] = False + def is_alive(self): try: # NOTE(zyuan): Requires admin access to mongodb diff --git a/zaqar/queues/storage/pooling.py b/zaqar/queues/storage/pooling.py index 11722c248..2c4095dc6 100644 --- a/zaqar/queues/storage/pooling.py +++ b/zaqar/queues/storage/pooling.py @@ -399,7 +399,8 @@ class Catalog(object): :rtype: zaqar.queues.storage.base.DataDriverBase """ pool = self._pools_ctrl.get(pool_id, detailed=True) - conf = utils.dynamic_conf(pool['uri'], pool['options']) + conf = utils.dynamic_conf(pool['uri'], pool['options'], + conf=self._conf) return utils.load_storage_driver(conf, self._cache) @decorators.caches(_pool_cache_key, _POOL_CACHE_TTL) diff --git a/zaqar/queues/storage/utils.py b/zaqar/queues/storage/utils.py index 81b91896d..c56af70d5 100644 --- a/zaqar/queues/storage/utils.py +++ b/zaqar/queues/storage/utils.py @@ -49,7 +49,7 @@ def dynamic_conf(uri, options, conf=None): if conf is None: conf = cfg.ConfigOpts() else: - conf = copy.deepcopy(conf) + conf = copy.copy(conf) if storage_group not in conf: conf.register_opts(storage_opts, @@ -60,6 +60,7 @@ def dynamic_conf(uri, options, conf=None): driver_opts = utils.dict_to_conf({'storage': storage_type}) conf.register_opts(driver_opts, group=u'drivers') + conf.set_override('storage', storage_type, 'drivers') conf.set_override('uri', uri, group=storage_group) return conf diff --git a/zaqar/queues/transport/wsgi/v1_0/pools.py b/zaqar/queues/transport/wsgi/v1_0/pools.py index 1fd5ad192..bb8089a4b 100644 --- a/zaqar/queues/transport/wsgi/v1_0/pools.py +++ b/zaqar/queues/transport/wsgi/v1_0/pools.py @@ -150,9 +150,10 @@ class Resource(object): LOG.debug(u'PUT pool - name: %s', pool) + conf = self._ctrl.driver.conf data = wsgi_utils.load(request) wsgi_utils.validate(self._validators['create'], data) - if not storage_utils.can_connect(data['uri']): + if not storage_utils.can_connect(data['uri'], conf=conf): raise wsgi_errors.HTTPBadRequestBody( 'cannot connect to %s' % data['uri'] ) @@ -199,7 +200,9 @@ class Resource(object): for field in EXPECT: wsgi_utils.validate(self._validators[field], data) - if 'uri' in data and not storage_utils.can_connect(data['uri']): + conf = self._ctrl.driver.conf + if 'uri' in data and not storage_utils.can_connect(data['uri'], + conf=conf): raise wsgi_errors.HTTPBadRequestBody( 'cannot connect to %s' % data['uri'] ) diff --git a/zaqar/queues/transport/wsgi/v1_1/pools.py b/zaqar/queues/transport/wsgi/v1_1/pools.py index f53d0e78f..0342c6a27 100644 --- a/zaqar/queues/transport/wsgi/v1_1/pools.py +++ b/zaqar/queues/transport/wsgi/v1_1/pools.py @@ -152,9 +152,10 @@ class Resource(object): LOG.debug(u'PUT pool - name: %s', pool) + conf = self._ctrl.driver.conf data = wsgi_utils.load(request) wsgi_utils.validate(self._validators['create'], data) - if not storage_utils.can_connect(data['uri']): + if not storage_utils.can_connect(data['uri'], conf=conf): raise wsgi_errors.HTTPBadRequestBody( 'cannot connect to %s' % data['uri'] ) @@ -212,7 +213,9 @@ class Resource(object): for field in EXPECT: wsgi_utils.validate(self._validators[field], data) - if 'uri' in data and not storage_utils.can_connect(data['uri']): + conf = self._ctrl.driver.conf + if 'uri' in data and not storage_utils.can_connect(data['uri'], + conf=conf): raise wsgi_errors.HTTPBadRequestBody( 'cannot connect to %s' % data['uri'] ) diff --git a/zaqar/tests/base.py b/zaqar/tests/base.py index 1faa5ac45..78c4b43ab 100644 --- a/zaqar/tests/base.py +++ b/zaqar/tests/base.py @@ -20,6 +20,8 @@ from oslo.config import cfg import six import testtools +from zaqar.queues import bootstrap + class TestBase(testtools.TestCase): """Child class of testtools.TestCase for testing Zaqar. @@ -46,6 +48,8 @@ class TestBase(testtools.TestCase): else: self.conf = cfg.ConfigOpts() + self.conf.register_opts(bootstrap._GENERAL_OPTIONS) + @classmethod def conf_path(cls, filename): """Returns the full path to the specified Zaqar conf file. diff --git a/zaqar/tests/queues/storage/base.py b/zaqar/tests/queues/storage/base.py index 3d002b459..34119d4a9 100644 --- a/zaqar/tests/queues/storage/base.py +++ b/zaqar/tests/queues/storage/base.py @@ -24,7 +24,6 @@ import six from testtools import matchers from zaqar.openstack.common.cache import cache as oslo_cache -from zaqar.queues import bootstrap from zaqar.queues import storage from zaqar.queues.storage import errors from zaqar import tests as testing @@ -52,7 +51,6 @@ class ControllerBaseTest(testing.TestBase): oslo_cache.register_oslo_configs(self.conf) cache = oslo_cache.get_cache(self.conf.cache_url) - self.conf.register_opts(bootstrap._GENERAL_OPTIONS) pooling = 'pooling' in self.conf and self.conf.pooling if pooling and not self.control_driver_class: self.skipTest("Pooling is enabled, " diff --git a/zaqar/tests/queues/transport/wsgi/base.py b/zaqar/tests/queues/transport/wsgi/base.py index a20df4c14..d391620a2 100644 --- a/zaqar/tests/queues/transport/wsgi/base.py +++ b/zaqar/tests/queues/transport/wsgi/base.py @@ -33,6 +33,7 @@ class TestBase(testing.TestBase): if not self.config_file: self.skipTest("No config specified") + self.conf.register_opts(bootstrap._GENERAL_OPTIONS) self.conf.register_opts(validation._TRANSPORT_LIMITS_OPTIONS, group=validation._TRANSPORT_LIMITS_GROUP) self.transport_cfg = self.conf[validation._TRANSPORT_LIMITS_GROUP] @@ -41,6 +42,7 @@ class TestBase(testing.TestBase): group=driver._WSGI_GROUP) self.wsgi_cfg = self.conf[driver._WSGI_GROUP] + self.conf.unreliable = True self.conf.admin_mode = True self.boot = bootstrap.Bootstrap(self.conf)