Merge "Allow collector service database connection retry"
This commit is contained in:
commit
1b1251d486
@ -38,8 +38,33 @@ class DatabaseDispatcher(dispatcher.Base):
|
|||||||
"""
|
"""
|
||||||
def __init__(self, conf):
|
def __init__(self, conf):
|
||||||
super(DatabaseDispatcher, self).__init__(conf)
|
super(DatabaseDispatcher, self).__init__(conf)
|
||||||
self.meter_conn = storage.get_connection_from_config(conf, 'metering')
|
|
||||||
self.event_conn = storage.get_connection_from_config(conf, 'event')
|
self._meter_conn = self._get_db_conn('metering', True)
|
||||||
|
self._event_conn = self._get_db_conn('event', True)
|
||||||
|
|
||||||
|
def _get_db_conn(self, purpose, ignore_exception=False):
|
||||||
|
try:
|
||||||
|
return storage.get_connection_from_config(self.conf, purpose)
|
||||||
|
except Exception as err:
|
||||||
|
params = {"purpose": purpose, "err": err}
|
||||||
|
LOG.exception(_("Failed to connect to db, purpose %(purpose)s "
|
||||||
|
"re-try later: %(err)s") % params)
|
||||||
|
if not ignore_exception:
|
||||||
|
raise
|
||||||
|
|
||||||
|
@property
|
||||||
|
def meter_conn(self):
|
||||||
|
if not self._meter_conn:
|
||||||
|
self._meter_conn = self._get_db_conn('metering')
|
||||||
|
|
||||||
|
return self._meter_conn
|
||||||
|
|
||||||
|
@property
|
||||||
|
def event_conn(self):
|
||||||
|
if not self._event_conn:
|
||||||
|
self._event_conn = self._get_db_conn('event')
|
||||||
|
|
||||||
|
return self._event_conn
|
||||||
|
|
||||||
def record_metering_data(self, data):
|
def record_metering_data(self, data):
|
||||||
# We may have receive only one counter on the wire
|
# We may have receive only one counter on the wire
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
|
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
from oslo.db import options as db_options
|
from oslo.db import options as db_options
|
||||||
|
import retrying
|
||||||
import six
|
import six
|
||||||
import six.moves.urllib.parse as urlparse
|
import six.moves.urllib.parse as urlparse
|
||||||
from stevedore import driver
|
from stevedore import driver
|
||||||
@ -74,6 +75,11 @@ class StorageBadAggregate(Exception):
|
|||||||
code = 400
|
code = 400
|
||||||
|
|
||||||
|
|
||||||
|
# Convert retry_interval secs to msecs for retry decorator
|
||||||
|
@retrying.retry(wait_fixed=cfg.CONF.database.retry_interval * 1000,
|
||||||
|
stop_max_attempt_number=cfg.CONF.database.max_retries
|
||||||
|
if cfg.CONF.database.max_retries >= 0
|
||||||
|
else None)
|
||||||
def get_connection_from_config(conf, purpose=None):
|
def get_connection_from_config(conf, purpose=None):
|
||||||
if conf.database_connection:
|
if conf.database_connection:
|
||||||
conf.set_override('connection', conf.database_connection,
|
conf.set_override('connection', conf.database_connection,
|
||||||
@ -83,6 +89,9 @@ def get_connection_from_config(conf, purpose=None):
|
|||||||
if purpose:
|
if purpose:
|
||||||
namespace = 'ceilometer.%s.storage' % purpose
|
namespace = 'ceilometer.%s.storage' % purpose
|
||||||
url = getattr(conf.database, '%s_connection' % purpose) or url
|
url = getattr(conf.database, '%s_connection' % purpose) or url
|
||||||
|
# Set max_retries to 0, since oslo.db in certain cases may attempt to retry
|
||||||
|
# making the db connection retried max_retries ^ 2 times in failure case
|
||||||
|
conf.set_override('max_retries', 0, group='database')
|
||||||
return get_connection(url, namespace)
|
return get_connection(url, namespace)
|
||||||
|
|
||||||
|
|
||||||
|
@ -19,8 +19,6 @@
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
import time
|
|
||||||
|
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
from oslo.utils import netutils
|
from oslo.utils import netutils
|
||||||
import pymongo
|
import pymongo
|
||||||
@ -179,26 +177,12 @@ class ConnectionPool(object):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _mongo_connect(url):
|
def _mongo_connect(url):
|
||||||
max_retries = cfg.CONF.database.max_retries
|
try:
|
||||||
retry_interval = cfg.CONF.database.retry_interval
|
return pymongo.MongoClient(url, safe=True)
|
||||||
attempts = 0
|
except pymongo.errors.ConnectionFailure as e:
|
||||||
while True:
|
LOG.warn(_('Unable to connect to the database server: '
|
||||||
try:
|
'%(errmsg)s.') % {'errmsg': e})
|
||||||
client = pymongo.MongoClient(url, safe=True)
|
raise
|
||||||
except pymongo.errors.ConnectionFailure as e:
|
|
||||||
if 0 <= max_retries <= attempts:
|
|
||||||
LOG.error(_('Unable to connect to the database after '
|
|
||||||
'%(retries)d retries. Giving up.') %
|
|
||||||
{'retries': max_retries})
|
|
||||||
raise
|
|
||||||
LOG.warn(_('Unable to connect to the database server: '
|
|
||||||
'%(errmsg)s. Trying again in %(retry_interval)d '
|
|
||||||
'seconds.') %
|
|
||||||
{'errmsg': e, 'retry_interval': retry_interval})
|
|
||||||
attempts += 1
|
|
||||||
time.sleep(retry_interval)
|
|
||||||
else:
|
|
||||||
return client
|
|
||||||
|
|
||||||
|
|
||||||
class QueryTransformer(object):
|
class QueryTransformer(object):
|
||||||
|
@ -73,7 +73,7 @@ class TestDispatcherDB(base.BaseTestCase):
|
|||||||
def record_metering_data(self, data):
|
def record_metering_data(self, data):
|
||||||
self.called = True
|
self.called = True
|
||||||
|
|
||||||
self.dispatcher.meter_conn = ErrorConnection()
|
self.dispatcher._meter_conn = ErrorConnection()
|
||||||
|
|
||||||
self.dispatcher.record_metering_data(msg)
|
self.dispatcher.record_metering_data(msg)
|
||||||
|
|
||||||
|
@ -16,8 +16,10 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
"""Tests for ceilometer/storage/
|
"""Tests for ceilometer/storage/
|
||||||
"""
|
"""
|
||||||
|
import mock
|
||||||
from oslo.config import fixture as fixture_config
|
from oslo.config import fixture as fixture_config
|
||||||
from oslotest import base
|
from oslotest import base
|
||||||
|
import retrying
|
||||||
|
|
||||||
from ceilometer.alarm.storage import impl_log as impl_log_alarm
|
from ceilometer.alarm.storage import impl_log as impl_log_alarm
|
||||||
from ceilometer.alarm.storage import impl_sqlalchemy as impl_sqlalchemy_alarm
|
from ceilometer.alarm.storage import impl_sqlalchemy as impl_sqlalchemy_alarm
|
||||||
@ -30,7 +32,6 @@ import six
|
|||||||
|
|
||||||
|
|
||||||
class EngineTest(base.BaseTestCase):
|
class EngineTest(base.BaseTestCase):
|
||||||
|
|
||||||
def test_get_connection(self):
|
def test_get_connection(self):
|
||||||
engine = storage.get_connection('log://localhost',
|
engine = storage.get_connection('log://localhost',
|
||||||
'ceilometer.metering.storage')
|
'ceilometer.metering.storage')
|
||||||
@ -44,6 +45,23 @@ class EngineTest(base.BaseTestCase):
|
|||||||
self.assertIn('no-such-engine', six.text_type(err))
|
self.assertIn('no-such-engine', six.text_type(err))
|
||||||
|
|
||||||
|
|
||||||
|
class ConnectionRetryTest(base.BaseTestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(ConnectionRetryTest, self).setUp()
|
||||||
|
self.CONF = self.useFixture(fixture_config.Config()).conf
|
||||||
|
|
||||||
|
def test_retries(self):
|
||||||
|
with mock.patch.object(retrying.time, 'sleep') as retry_sleep:
|
||||||
|
try:
|
||||||
|
self.CONF.set_override("connection", "no-such-engine://",
|
||||||
|
group="database")
|
||||||
|
storage.get_connection_from_config(self.CONF)
|
||||||
|
except RuntimeError as err:
|
||||||
|
self.assertIn('no-such-engine', six.text_type(err))
|
||||||
|
self.assertEqual(retry_sleep.call_count, 9)
|
||||||
|
retry_sleep.assert_called_with(10.0)
|
||||||
|
|
||||||
|
|
||||||
class ConnectionConfigTest(base.BaseTestCase):
|
class ConnectionConfigTest(base.BaseTestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(ConnectionConfigTest, self).setUp()
|
super(ConnectionConfigTest, self).setUp()
|
||||||
|
@ -12,17 +12,13 @@
|
|||||||
"""Tests the mongodb and db2 common functionality
|
"""Tests the mongodb and db2 common functionality
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import contextlib
|
|
||||||
import copy
|
import copy
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
import pymongo
|
|
||||||
|
|
||||||
from ceilometer.openstack.common.gettextutils import _
|
|
||||||
from ceilometer.publisher import utils
|
from ceilometer.publisher import utils
|
||||||
from ceilometer import sample
|
from ceilometer import sample
|
||||||
from ceilometer.storage.mongo import utils as pymongo_utils
|
|
||||||
from ceilometer.tests import db as tests_db
|
from ceilometer.tests import db as tests_db
|
||||||
from ceilometer.tests.storage import test_storage_scenarios
|
from ceilometer.tests.storage import test_storage_scenarios
|
||||||
|
|
||||||
@ -168,41 +164,3 @@ class CompatibilityTest(test_storage_scenarios.DBTestBase,
|
|||||||
def test_counter_unit(self):
|
def test_counter_unit(self):
|
||||||
meters = list(self.conn.get_meters())
|
meters = list(self.conn.get_meters())
|
||||||
self.assertEqual(1, len(meters))
|
self.assertEqual(1, len(meters))
|
||||||
|
|
||||||
def test_mongodb_connect_raises_after_custom_number_of_attempts(self):
|
|
||||||
retry_interval = 13
|
|
||||||
max_retries = 37
|
|
||||||
self.CONF.set_override(
|
|
||||||
'retry_interval', retry_interval, group='database')
|
|
||||||
self.CONF.set_override(
|
|
||||||
'max_retries', max_retries, group='database')
|
|
||||||
# PyMongo is being used to connect even to DB2, but it only
|
|
||||||
# accepts URLs with the 'mongodb' scheme. This replacement is
|
|
||||||
# usually done in the DB2 connection implementation, but since
|
|
||||||
# we don't call that, we have to do it here.
|
|
||||||
self.CONF.set_override(
|
|
||||||
'connection', self.db_manager.url.replace('db2:', 'mongodb:', 1),
|
|
||||||
group='database')
|
|
||||||
|
|
||||||
pool = pymongo_utils.ConnectionPool()
|
|
||||||
with contextlib.nested(
|
|
||||||
mock.patch(
|
|
||||||
'pymongo.MongoClient',
|
|
||||||
side_effect=pymongo.errors.ConnectionFailure('foo')),
|
|
||||||
mock.patch.object(pymongo_utils.LOG, 'error'),
|
|
||||||
mock.patch.object(pymongo_utils.LOG, 'warn'),
|
|
||||||
mock.patch.object(pymongo_utils.time, 'sleep')
|
|
||||||
) as (MockMongo, MockLOGerror, MockLOGwarn, Mocksleep):
|
|
||||||
self.assertRaises(pymongo.errors.ConnectionFailure,
|
|
||||||
pool.connect, self.CONF.database.connection)
|
|
||||||
Mocksleep.assert_has_calls([mock.call(retry_interval)
|
|
||||||
for i in range(max_retries)])
|
|
||||||
MockLOGwarn.assert_any_call(
|
|
||||||
_('Unable to connect to the database server: %(errmsg)s.'
|
|
||||||
' Trying again in %(retry_interval)d seconds.') %
|
|
||||||
{'errmsg': 'foo',
|
|
||||||
'retry_interval': retry_interval})
|
|
||||||
MockLOGerror.assert_called_with(
|
|
||||||
_('Unable to connect to the database after '
|
|
||||||
'%(retries)d retries. Giving up.') %
|
|
||||||
{'retries': max_retries})
|
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
# of appearance. Changing the order has an impact on the overall integration
|
# of appearance. Changing the order has an impact on the overall integration
|
||||||
# process, which may cause wedges in the gate later.
|
# process, which may cause wedges in the gate later.
|
||||||
|
|
||||||
|
retrying>=1.2.2,!=1.3.0
|
||||||
alembic>=0.6.4
|
alembic>=0.6.4
|
||||||
anyjson>=0.3.3
|
anyjson>=0.3.3
|
||||||
argparse
|
argparse
|
||||||
|
Loading…
Reference in New Issue
Block a user