Add first reliability enforcement
This patch adds an reliability enforcement for mongodb's driver. It forces deployers to use replicasets or mongos as a mongodb cluster for Zaqar. In addition to that, it forces deployers to provide a write concern > 2 and/or majority. If none of this are met, the driver will raise a RuntimeException and fail to start. If no write concern is provided, majority will be used. Change-Id: Ie74a4b441654243b3ed7e7fd6c40863969cd446d Closes-bug: #1372335
This commit is contained in:
parent
2d3ee77efe
commit
a1163331fc
@ -1,6 +1,7 @@
|
||||
[DEFAULT]
|
||||
debug = False
|
||||
verbose = False
|
||||
unreliable = True
|
||||
|
||||
[drivers]
|
||||
transport = wsgi
|
||||
@ -13,4 +14,4 @@ port = 8888
|
||||
uri = mongodb://127.0.0.1:27017
|
||||
database = zaqar_test
|
||||
max_reconnect_attempts = 3
|
||||
reconnect_sleep = 0.001
|
||||
reconnect_sleep = 0.001
|
||||
|
@ -1,6 +1,7 @@
|
||||
[DEFAULT]
|
||||
pooling = True
|
||||
admin_mode = True
|
||||
unreliable = True
|
||||
|
||||
[drivers]
|
||||
transport = wsgi
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import ddt
|
||||
|
||||
from zaqar.queues import bootstrap
|
||||
from zaqar.queues.storage import utils
|
||||
from zaqar import tests as testing
|
||||
|
||||
@ -22,9 +23,15 @@ from zaqar import tests as testing
|
||||
@ddt.ddt
|
||||
class TestUtils(testing.TestBase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestUtils, self).setUp()
|
||||
self.conf.register_opts(bootstrap._GENERAL_OPTIONS)
|
||||
|
||||
@testing.requires_mongodb
|
||||
def test_can_connect_suceeds_if_good_uri_mongo(self):
|
||||
self.assertTrue(utils.can_connect('mongodb://localhost:27017'))
|
||||
self.config(unreliable=True)
|
||||
self.assertTrue(utils.can_connect('mongodb://localhost:27017',
|
||||
conf=self.conf))
|
||||
|
||||
@testing.requires_redis
|
||||
def test_can_connect_suceeds_if_good_uri_redis(self):
|
||||
@ -39,8 +46,11 @@ class TestUtils(testing.TestBase):
|
||||
|
||||
@testing.requires_mongodb
|
||||
def test_can_connect_fails_if_bad_uri_mongodb(self):
|
||||
self.assertFalse(utils.can_connect('mongodb://localhost:8080'))
|
||||
self.assertFalse(utils.can_connect('mongodb://example.com:27017'))
|
||||
self.config(unreliable=True)
|
||||
self.assertFalse(utils.can_connect('mongodb://localhost:8080',
|
||||
conf=self.conf))
|
||||
self.assertFalse(utils.can_connect('mongodb://example.com:27017',
|
||||
conf=self.conf))
|
||||
|
||||
@testing.requires_redis
|
||||
def test_can_connect_fails_if_bad_uri_redis(self):
|
||||
|
@ -26,6 +26,7 @@ import six
|
||||
from testtools import matchers
|
||||
|
||||
from zaqar.openstack.common.cache import cache as oslo_cache
|
||||
from zaqar.queues import bootstrap
|
||||
from zaqar.queues import storage
|
||||
from zaqar.queues.storage import errors
|
||||
from zaqar.queues.storage import mongodb
|
||||
@ -140,7 +141,14 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
|
||||
|
||||
config_file = 'wsgi_mongodb.conf'
|
||||
|
||||
def setUp(self):
|
||||
super(MongodbDriverTest, self).setUp()
|
||||
|
||||
self.conf.register_opts(bootstrap._GENERAL_OPTIONS)
|
||||
self.config(unreliable=False)
|
||||
|
||||
def test_db_instance(self):
|
||||
self.config(unreliable=True)
|
||||
cache = oslo_cache.get_cache()
|
||||
driver = mongodb.DataDriver(self.conf, cache)
|
||||
|
||||
@ -152,6 +160,7 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
|
||||
driver.mongodb_conf.database))
|
||||
|
||||
def test_version_match(self):
|
||||
self.config(unreliable=True)
|
||||
cache = oslo_cache.get_cache()
|
||||
|
||||
with mock.patch('pymongo.MongoClient.server_info') as info:
|
||||
@ -166,6 +175,54 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
|
||||
except RuntimeError:
|
||||
self.fail('version match failed')
|
||||
|
||||
def test_replicaset_or_mongos_needed(self):
|
||||
cache = oslo_cache.get_cache()
|
||||
|
||||
with mock.patch('pymongo.MongoClient.nodes') as nodes:
|
||||
nodes.__get__ = mock.Mock(return_value=[])
|
||||
with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos:
|
||||
is_mongos.__get__ = mock.Mock(return_value=False)
|
||||
self.assertRaises(RuntimeError, mongodb.DataDriver,
|
||||
self.conf, cache)
|
||||
|
||||
def test_using_replset(self):
|
||||
cache = oslo_cache.get_cache()
|
||||
|
||||
with mock.patch('pymongo.MongoClient.nodes') as nodes:
|
||||
nodes.__get__ = mock.Mock(return_value=['node1', 'node2'])
|
||||
mongodb.DataDriver(self.conf, cache)
|
||||
|
||||
def test_using_mongos(self):
|
||||
cache = oslo_cache.get_cache()
|
||||
|
||||
with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos:
|
||||
is_mongos.__get__ = mock.Mock(return_value=True)
|
||||
mongodb.DataDriver(self.conf, cache)
|
||||
|
||||
def test_write_concern_check_works(self):
|
||||
cache = oslo_cache.get_cache()
|
||||
|
||||
with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos:
|
||||
is_mongos.__get__ = mock.Mock(return_value=True)
|
||||
|
||||
with mock.patch('pymongo.MongoClient.write_concern') as wc:
|
||||
wc.__get__ = mock.Mock(return_value={'w': 1})
|
||||
self.assertRaises(RuntimeError, mongodb.DataDriver,
|
||||
self.conf, cache)
|
||||
|
||||
wc.__get__ = mock.Mock(return_value={'w': 2})
|
||||
mongodb.DataDriver(self.conf, cache)
|
||||
|
||||
def test_write_concern_is_set(self):
|
||||
cache = oslo_cache.get_cache()
|
||||
|
||||
with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos:
|
||||
is_mongos.__get__ = mock.Mock(return_value=True)
|
||||
driver = mongodb.DataDriver(self.conf, cache)
|
||||
wc = driver.connection.write_concern
|
||||
self.assertEqual(wc['w'], 'majority')
|
||||
self.assertEqual(wc['j'], False)
|
||||
|
||||
|
||||
@testing.requires_mongodb
|
||||
class MongodbQueueTests(MongodbSetupMixin, base.QueueControllerTest):
|
||||
|
@ -27,17 +27,19 @@ from zaqar import tests as testing
|
||||
@testing.requires_mongodb
|
||||
class PoolQueuesTest(testing.TestBase):
|
||||
|
||||
config_file = 'wsgi_mongodb_pooled.conf'
|
||||
|
||||
def setUp(self):
|
||||
super(PoolQueuesTest, self).setUp()
|
||||
conf = self.load_conf('wsgi_mongodb_pooled.conf')
|
||||
|
||||
conf.register_opts([cfg.StrOpt('storage')],
|
||||
group='drivers')
|
||||
self.conf.register_opts([cfg.StrOpt('storage')],
|
||||
group='drivers')
|
||||
|
||||
cache = oslo_cache.get_cache()
|
||||
control = utils.load_storage_driver(conf, cache, control_mode=True)
|
||||
control = utils.load_storage_driver(self.conf, cache,
|
||||
control_mode=True)
|
||||
self.pools_ctrl = control.pools_controller
|
||||
self.driver = pooling.DataDriver(conf, cache, control)
|
||||
self.driver = pooling.DataDriver(self.conf, cache, control)
|
||||
self.controller = self.driver.queue_controller
|
||||
|
||||
# fake two pools
|
||||
|
@ -18,6 +18,7 @@ from stevedore import driver
|
||||
|
||||
from zaqar.common import decorators
|
||||
from zaqar.common import errors
|
||||
from zaqar.i18n import _
|
||||
from zaqar.openstack.common.cache import cache as oslo_cache
|
||||
from zaqar.openstack.common import log
|
||||
from zaqar.queues.storage import pipeline
|
||||
@ -49,6 +50,9 @@ _GENERAL_OPTIONS = (
|
||||
'configuration is used to determine where the '
|
||||
'catalogue/control plane data is kept.'),
|
||||
deprecated_opts=[cfg.DeprecatedOpt('pooling')]),
|
||||
|
||||
cfg.BoolOpt('unreliable', default=None,
|
||||
help=('Disable all reliability constrains.')),
|
||||
)
|
||||
|
||||
_DRIVER_OPTIONS = (
|
||||
@ -81,6 +85,14 @@ class Bootstrap(object):
|
||||
|
||||
log.setup('zaqar')
|
||||
|
||||
if self.conf.unreliable is None:
|
||||
msg = _(u'Unreliable\'s default value will be changed to False '
|
||||
'in the Kilo release. Please, make sure your deployments '
|
||||
'are working in a reliable mode or that `unreliable` is '
|
||||
'explicitly set to `True` in your configuration files.')
|
||||
LOG.warn(msg)
|
||||
self.conf.unreliable = True
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def storage(self):
|
||||
LOG.debug(u'Loading storage driver')
|
||||
|
@ -72,11 +72,36 @@ class DataDriver(storage.DataDriverBase):
|
||||
|
||||
self.mongodb_conf = self.conf[options.MONGODB_GROUP]
|
||||
|
||||
server_version = self.connection.server_info()['version']
|
||||
conn = self.connection
|
||||
server_version = conn.server_info()['version']
|
||||
|
||||
if tuple(map(int, server_version.split('.'))) < (2, 2):
|
||||
raise RuntimeError(_('The mongodb driver requires mongodb>=2.2, '
|
||||
'%s found') % server_version)
|
||||
|
||||
if not len(conn.nodes) > 1 and not conn.is_mongos:
|
||||
if not self.conf.unreliable:
|
||||
raise RuntimeError(_('Either a replica set or a mongos is '
|
||||
'required to guarantee message delivery'))
|
||||
else:
|
||||
wc = conn.write_concern.get('w')
|
||||
majority = (wc == 'majority' or
|
||||
wc >= 2)
|
||||
|
||||
if not wc:
|
||||
# NOTE(flaper87): No write concern specified, use majority
|
||||
# and don't count journal as a replica. Use `update` to avoid
|
||||
# overwriting `wtimeout`
|
||||
conn.write_concern.update({'w': 'majority'})
|
||||
elif not self.conf.unreliable and not majority:
|
||||
raise RuntimeError(_('Using a write concern other than '
|
||||
'`majority` or > 2 make sthe service '
|
||||
'unreliable. Please use a different '
|
||||
'write concern or set `unreliable` '
|
||||
'to True in the config file.'))
|
||||
|
||||
conn.write_concern['j'] = False
|
||||
|
||||
def is_alive(self):
|
||||
try:
|
||||
# NOTE(zyuan): Requires admin access to mongodb
|
||||
|
@ -399,7 +399,8 @@ class Catalog(object):
|
||||
:rtype: zaqar.queues.storage.base.DataDriverBase
|
||||
"""
|
||||
pool = self._pools_ctrl.get(pool_id, detailed=True)
|
||||
conf = utils.dynamic_conf(pool['uri'], pool['options'])
|
||||
conf = utils.dynamic_conf(pool['uri'], pool['options'],
|
||||
conf=self._conf)
|
||||
return utils.load_storage_driver(conf, self._cache)
|
||||
|
||||
@decorators.caches(_pool_cache_key, _POOL_CACHE_TTL)
|
||||
|
@ -49,7 +49,7 @@ def dynamic_conf(uri, options, conf=None):
|
||||
if conf is None:
|
||||
conf = cfg.ConfigOpts()
|
||||
else:
|
||||
conf = copy.deepcopy(conf)
|
||||
conf = copy.copy(conf)
|
||||
|
||||
if storage_group not in conf:
|
||||
conf.register_opts(storage_opts,
|
||||
@ -60,6 +60,7 @@ def dynamic_conf(uri, options, conf=None):
|
||||
driver_opts = utils.dict_to_conf({'storage': storage_type})
|
||||
conf.register_opts(driver_opts, group=u'drivers')
|
||||
|
||||
conf.set_override('storage', storage_type, 'drivers')
|
||||
conf.set_override('uri', uri, group=storage_group)
|
||||
return conf
|
||||
|
||||
|
@ -150,9 +150,10 @@ class Resource(object):
|
||||
|
||||
LOG.debug(u'PUT pool - name: %s', pool)
|
||||
|
||||
conf = self._ctrl.driver.conf
|
||||
data = wsgi_utils.load(request)
|
||||
wsgi_utils.validate(self._validators['create'], data)
|
||||
if not storage_utils.can_connect(data['uri']):
|
||||
if not storage_utils.can_connect(data['uri'], conf=conf):
|
||||
raise wsgi_errors.HTTPBadRequestBody(
|
||||
'cannot connect to %s' % data['uri']
|
||||
)
|
||||
@ -199,7 +200,9 @@ class Resource(object):
|
||||
for field in EXPECT:
|
||||
wsgi_utils.validate(self._validators[field], data)
|
||||
|
||||
if 'uri' in data and not storage_utils.can_connect(data['uri']):
|
||||
conf = self._ctrl.driver.conf
|
||||
if 'uri' in data and not storage_utils.can_connect(data['uri'],
|
||||
conf=conf):
|
||||
raise wsgi_errors.HTTPBadRequestBody(
|
||||
'cannot connect to %s' % data['uri']
|
||||
)
|
||||
|
@ -152,9 +152,10 @@ class Resource(object):
|
||||
|
||||
LOG.debug(u'PUT pool - name: %s', pool)
|
||||
|
||||
conf = self._ctrl.driver.conf
|
||||
data = wsgi_utils.load(request)
|
||||
wsgi_utils.validate(self._validators['create'], data)
|
||||
if not storage_utils.can_connect(data['uri']):
|
||||
if not storage_utils.can_connect(data['uri'], conf=conf):
|
||||
raise wsgi_errors.HTTPBadRequestBody(
|
||||
'cannot connect to %s' % data['uri']
|
||||
)
|
||||
@ -212,7 +213,9 @@ class Resource(object):
|
||||
for field in EXPECT:
|
||||
wsgi_utils.validate(self._validators[field], data)
|
||||
|
||||
if 'uri' in data and not storage_utils.can_connect(data['uri']):
|
||||
conf = self._ctrl.driver.conf
|
||||
if 'uri' in data and not storage_utils.can_connect(data['uri'],
|
||||
conf=conf):
|
||||
raise wsgi_errors.HTTPBadRequestBody(
|
||||
'cannot connect to %s' % data['uri']
|
||||
)
|
||||
|
@ -20,6 +20,8 @@ from oslo.config import cfg
|
||||
import six
|
||||
import testtools
|
||||
|
||||
from zaqar.queues import bootstrap
|
||||
|
||||
|
||||
class TestBase(testtools.TestCase):
|
||||
"""Child class of testtools.TestCase for testing Zaqar.
|
||||
@ -46,6 +48,8 @@ class TestBase(testtools.TestCase):
|
||||
else:
|
||||
self.conf = cfg.ConfigOpts()
|
||||
|
||||
self.conf.register_opts(bootstrap._GENERAL_OPTIONS)
|
||||
|
||||
@classmethod
|
||||
def conf_path(cls, filename):
|
||||
"""Returns the full path to the specified Zaqar conf file.
|
||||
|
@ -24,7 +24,6 @@ import six
|
||||
from testtools import matchers
|
||||
|
||||
from zaqar.openstack.common.cache import cache as oslo_cache
|
||||
from zaqar.queues import bootstrap
|
||||
from zaqar.queues import storage
|
||||
from zaqar.queues.storage import errors
|
||||
from zaqar import tests as testing
|
||||
@ -52,7 +51,6 @@ class ControllerBaseTest(testing.TestBase):
|
||||
oslo_cache.register_oslo_configs(self.conf)
|
||||
cache = oslo_cache.get_cache(self.conf.cache_url)
|
||||
|
||||
self.conf.register_opts(bootstrap._GENERAL_OPTIONS)
|
||||
pooling = 'pooling' in self.conf and self.conf.pooling
|
||||
if pooling and not self.control_driver_class:
|
||||
self.skipTest("Pooling is enabled, "
|
||||
|
@ -33,6 +33,7 @@ class TestBase(testing.TestBase):
|
||||
if not self.config_file:
|
||||
self.skipTest("No config specified")
|
||||
|
||||
self.conf.register_opts(bootstrap._GENERAL_OPTIONS)
|
||||
self.conf.register_opts(validation._TRANSPORT_LIMITS_OPTIONS,
|
||||
group=validation._TRANSPORT_LIMITS_GROUP)
|
||||
self.transport_cfg = self.conf[validation._TRANSPORT_LIMITS_GROUP]
|
||||
@ -41,6 +42,7 @@ class TestBase(testing.TestBase):
|
||||
group=driver._WSGI_GROUP)
|
||||
self.wsgi_cfg = self.conf[driver._WSGI_GROUP]
|
||||
|
||||
self.conf.unreliable = True
|
||||
self.conf.admin_mode = True
|
||||
self.boot = bootstrap.Bootstrap(self.conf)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user