[MongoDB] Fix bug with reconnection to new master node

Fixes bug with raising AutoReconnect exception when MongoDB ReplicaSet
loses connection to primary node.

Change-Id: Id0e81ba60b28d09adff6a10d04b412f25257d8ce
Closes-Bug: #1309555
This commit is contained in:
Igor Degtiarov 2014-10-23 14:05:38 +03:00
parent 1b1251d486
commit 21d882c96c
9 changed files with 195 additions and 7 deletions

View File

@ -73,5 +73,5 @@ class Connection(pymongo_base.Connection):
# not been implemented. However calling this method is important for
# removal of all the empty dbs created during the test runs since
# test run is against mongodb on Jenkins
self.conn.drop_database(self.db)
self.conn.drop_database(self.db.name)
self.conn.close()

View File

@ -63,6 +63,6 @@ class Connection(pymongo_base.Connection):
self.upgrade()
def clear(self):
self.conn.drop_database(self.db)
self.conn.drop_database(self.db.name)
# Connection will be reopened automatically if needed
self.conn.close()

View File

@ -60,5 +60,5 @@ class Connection(pymongo_base.Connection):
# not been implemented. However calling this method is important for
# removal of all the empty dbs created during the test runs since
# test run is against mongodb on Jenkins
self.conn.drop_database(self.db)
self.conn.drop_database(self.db.name)
self.conn.close()

View File

@ -47,6 +47,6 @@ class Connection(pymongo_base.Connection):
self.upgrade()
def clear(self):
self.conn.drop_database(self.db)
self.conn.drop_database(self.db.name)
# Connection will be reopened automatically if needed
self.conn.close()

View File

@ -58,6 +58,10 @@ OPTS = [
default=None,
help='The connection string used to connect to the event '
'database. (if unset, connection is used)'),
cfg.StrOpt('mongodb_replica_set',
default='',
help="The connection string used to connect to mongo database, "
"if mongodb replica set was chosen."),
]
cfg.CONF.register_opts(OPTS, group='database')

View File

@ -198,7 +198,7 @@ class Connection(pymongo_base.Connection):
# not been implemented. However calling this method is important for
# removal of all the empty dbs created during the test runs since
# test run is against mongodb on Jenkins
self.conn.drop_database(self.db)
self.conn.drop_database(self.db.name)
self.conn.close()
def record_metering_data(self, data):

View File

@ -470,7 +470,7 @@ class Connection(pymongo_base.Connection):
)
def clear(self):
self.conn.drop_database(self.db)
self.conn.drop_database(self.db.name)
# Connection will be reopened automatically if needed
self.conn.close()

View File

@ -18,6 +18,7 @@
"""Common functions for MongoDB and DB2 backends
"""
import time
from oslo.config import cfg
from oslo.utils import netutils
@ -178,7 +179,16 @@ class ConnectionPool(object):
@staticmethod
def _mongo_connect(url):
try:
return pymongo.MongoClient(url, safe=True)
if cfg.CONF.database.mongodb_replica_set:
client = MongoProxy(
Prefection(
pymongo.MongoReplicaSetClient(
url,
replicaSet=cfg.CONF.database.mongodb_replica_set)))
else:
client = MongoProxy(
Prefection(pymongo.MongoClient(url, safe=True)))
return client
except pymongo.errors.ConnectionFailure as e:
LOG.warn(_('Unable to connect to the database server: '
'%(errmsg)s.') % {'errmsg': e})
@ -305,3 +315,89 @@ class QueryTransformer(object):
return self._handle_not_op(negated_tree)
return self._handle_simple_op(operator_node, nodes)
def safe_mongo_call(call):
def closure(*args, **kwargs):
max_retries = cfg.CONF.database.max_retries
retry_interval = cfg.CONF.database.retry_interval
attempts = 0
while True:
try:
return call(*args, **kwargs)
except pymongo.errors.AutoReconnect as err:
if 0 <= max_retries <= attempts:
LOG.error(_('Unable to reconnect to the primary mongodb '
'after %(retries)d retries. Giving up.') %
{'retries': max_retries})
raise
LOG.warn(_('Unable to reconnect to the primary mongodb: '
'%(errmsg)s. Trying again in %(retry_interval)d '
'seconds.') %
{'errmsg': err, 'retry_interval': retry_interval})
attempts += 1
time.sleep(retry_interval)
return closure
class MongoConn(object):
def __init__(self, method):
self.method = method
@safe_mongo_call
def __call__(self, *args, **kwargs):
return self.method(*args, **kwargs)
MONGO_METHODS = set([typ for typ in dir(pymongo.collection.Collection)
if not typ.startswith('_')])
MONGO_METHODS.update(set([typ for typ in dir(pymongo.MongoClient)
if not typ.startswith('_')]))
MONGO_METHODS.update(set([typ for typ in dir(pymongo)
if not typ.startswith('_')]))
class MongoProxy(object):
def __init__(self, conn):
self.conn = conn
def __getitem__(self, item):
"""Create and return proxy around the method in the connection.
:param item: name of the connection
"""
return MongoProxy(self.conn[item])
def __getattr__(self, item):
"""Wrap MongoDB connection.
If item is the name of an executable method, for example find or
insert, wrap this method in the MongoConn.
Else wrap getting attribute with MongoProxy.
"""
if item == 'name':
return getattr(self.conn, item)
if item in MONGO_METHODS:
return MongoConn(getattr(self.conn, item))
return MongoProxy(getattr(self.conn, item))
def __call__(self, *args, **kwargs):
return self.conn(*args, **kwargs)
class Prefection(pymongo.collection.Collection):
def __init__(self, conn):
self.conn = conn
def find(self, *args, **kwargs):
# We need this modifying method to check a connection for MongoDB
# in context of MongoProxy approach. Initially 'find' returns Cursor
# object and doesn't connect to db while Cursor is not used.
found = self.find(*args, **kwargs)
try:
found[0]
except IndexError:
pass
return found
def __getattr__(self, item):
return getattr(self.conn, item)

View File

@ -23,7 +23,9 @@ import datetime
import operator
import mock
from oslo.config import cfg
from oslo.utils import timeutils
import pymongo
import ceilometer
from ceilometer.alarm.storage import models as alarm_models
@ -3101,3 +3103,89 @@ class BigIntegerTest(tests_db.TestBase,
msg = utils.meter_message_from_counter(
s, self.CONF.publisher.metering_secret)
self.conn.record_metering_data(msg)
class MongoAutoReconnectTest(DBTestBase,
tests_db.MixinTestsWithBackendScenarios):
cfg.CONF.set_override('retry_interval', 1, group='database')
@tests_db.run_with('mongodb')
def test_mongo_client(self):
if cfg.CONF.database.mongodb_replica_set:
self.assertIsInstance(self.conn.conn.conn.conn,
pymongo.MongoReplicaSetClient)
else:
self.assertIsInstance(self.conn.conn.conn.conn,
pymongo.MongoClient)
@staticmethod
def create_side_effect(method, test_exception):
def side_effect(*args, **kwargs):
if test_exception.pop():
raise pymongo.errors.AutoReconnect
else:
return method(*args, **kwargs)
return side_effect
@tests_db.run_with('mongodb')
def test_mongo_find(self):
raise_exc = [False, True]
method = self.conn.db.resource.find
with mock.patch('pymongo.collection.Collection.find',
mock.Mock()) as mock_find:
mock_find.side_effect = self.create_side_effect(method, raise_exc)
mock_find.__name__ = 'find'
resources = list(self.conn.get_resources())
self.assertEqual(9, len(resources))
@tests_db.run_with('mongodb')
def test_mongo_insert(self):
raise_exc = [False, True]
method = self.conn.db.meter.insert
with mock.patch('pymongo.collection.Collection.insert',
mock.Mock(return_value=method)) as mock_insert:
mock_insert.side_effect = self.create_side_effect(method,
raise_exc)
mock_insert.__name__ = 'insert'
self.create_and_store_sample(
timestamp=datetime.datetime(2014, 10, 15, 14, 39),
source='test-proxy')
meters = list(self.conn.db.meter.find())
self.assertEqual(12, len(meters))
@tests_db.run_with('mongodb')
def test_mongo_find_and_modify(self):
raise_exc = [False, True]
method = self.conn.db.resource.find_and_modify
with mock.patch('pymongo.collection.Collection.find_and_modify',
mock.Mock()) as mock_fam:
mock_fam.side_effect = self.create_side_effect(method, raise_exc)
mock_fam.__name__ = 'find_and_modify'
self.create_and_store_sample(
timestamp=datetime.datetime(2014, 10, 15, 14, 39),
source='test-proxy')
data = self.conn.db.resource.find(
{'last_sample_timestamp':
datetime.datetime(2014, 10, 15, 14, 39)})[0]['source']
self.assertEqual('test-proxy', data)
@tests_db.run_with('mongodb')
def test_mongo_update(self):
raise_exc = [False, True]
method = self.conn.db.resource.update
with mock.patch('pymongo.collection.Collection.update',
mock.Mock()) as mock_update:
mock_update.side_effect = self.create_side_effect(method,
raise_exc)
mock_update.__name__ = 'update'
self.create_and_store_sample(
timestamp=datetime.datetime(2014, 10, 15, 17, 39),
source='test-proxy-update')
data = self.conn.db.resource.find(
{'last_sample_timestamp':
datetime.datetime(2014, 10, 15, 17, 39)})[0]['source']
self.assertEqual('test-proxy-update', data)