Using oslo.db retry decorator for sample create
Decorated record_metering_data method of impl_sqlalchemy with oslo.db.api.wrap_db_retry decorator to retry on deadlock errors. This will allow for configurable number of retry attempts when a deadlock occurs while creating a sample. Change-Id: I528ea4576f1daccae8fb53393b3c48ae45c509f5 Closes-Bug: 1432914
This commit is contained in:
parent
6d5e5e6bda
commit
9793fe7fa0
@ -18,6 +18,7 @@ import datetime
|
|||||||
import hashlib
|
import hashlib
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
from oslo.db import api
|
||||||
from oslo.db import exception as dbexc
|
from oslo.db import exception as dbexc
|
||||||
from oslo.db.sqlalchemy import session as db_session
|
from oslo.db.sqlalchemy import session as db_session
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
@ -316,6 +317,9 @@ class Connection(base.Connection):
|
|||||||
|
|
||||||
return internal_id
|
return internal_id
|
||||||
|
|
||||||
|
@api.wrap_db_retry(retry_interval=cfg.CONF.database.retry_interval,
|
||||||
|
max_retries=cfg.CONF.database.max_retries,
|
||||||
|
retry_on_deadlock=True)
|
||||||
def record_metering_data(self, data):
|
def record_metering_data(self, data):
|
||||||
"""Write the data to the backend storage system.
|
"""Write the data to the backend storage system.
|
||||||
|
|
||||||
|
@ -19,6 +19,8 @@ import datetime
|
|||||||
import operator
|
import operator
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
from oslo.db import api
|
||||||
|
from oslo.db import exception as dbexc
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
import pymongo
|
import pymongo
|
||||||
@ -35,6 +37,15 @@ from ceilometer.tests import db as tests_db
|
|||||||
|
|
||||||
|
|
||||||
class DBTestBase(tests_db.TestBase):
|
class DBTestBase(tests_db.TestBase):
|
||||||
|
@staticmethod
|
||||||
|
def create_side_effect(method, exception_type, test_exception):
|
||||||
|
def side_effect(*args, **kwargs):
|
||||||
|
if test_exception.pop():
|
||||||
|
raise exception_type
|
||||||
|
else:
|
||||||
|
return method(*args, **kwargs)
|
||||||
|
return side_effect
|
||||||
|
|
||||||
def create_and_store_sample(self, timestamp=datetime.datetime.utcnow(),
|
def create_and_store_sample(self, timestamp=datetime.datetime.utcnow(),
|
||||||
metadata=None,
|
metadata=None,
|
||||||
name='instance',
|
name='instance',
|
||||||
@ -697,6 +708,65 @@ class RawSampleTest(DBTestBase,
|
|||||||
results = list(self.conn.get_resources())
|
results = list(self.conn.get_resources())
|
||||||
self.assertEqual(6, len(results))
|
self.assertEqual(6, len(results))
|
||||||
|
|
||||||
|
@tests_db.run_with('sqlite', 'mysql', 'pgsql')
|
||||||
|
def test_record_metering_data_retry_success_on_deadlock(self):
|
||||||
|
raise_deadlock = [False, True]
|
||||||
|
self.CONF.set_override('max_retries', 2, group='database')
|
||||||
|
|
||||||
|
s = sample.Sample('instance', sample.TYPE_CUMULATIVE, unit='',
|
||||||
|
volume=1, user_id='user_id',
|
||||||
|
project_id='project_id',
|
||||||
|
resource_id='resource_id',
|
||||||
|
timestamp=datetime.datetime.utcnow(),
|
||||||
|
resource_metadata={'display_name': 'test-server',
|
||||||
|
'tag': 'self.counter'},
|
||||||
|
source=None)
|
||||||
|
|
||||||
|
msg = utils.meter_message_from_counter(
|
||||||
|
s, self.CONF.publisher.telemetry_secret
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_resource_create = mock.patch.object(self.conn, "_create_resource")
|
||||||
|
|
||||||
|
mock_resource_create.side_effect = self.create_side_effect(
|
||||||
|
self.conn._create_resource, dbexc.DBDeadlock, raise_deadlock)
|
||||||
|
with mock.patch.object(api.time, 'sleep') as retry_sleep:
|
||||||
|
self.conn.record_metering_data(msg)
|
||||||
|
self.assertEqual(1, retry_sleep.call_count)
|
||||||
|
|
||||||
|
f = storage.SampleFilter(meter='instance')
|
||||||
|
results = list(self.conn.get_samples(f))
|
||||||
|
self.assertEqual(13, len(results))
|
||||||
|
|
||||||
|
@tests_db.run_with('sqlite', 'mysql', 'pgsql')
|
||||||
|
def test_record_metering_data_retry_failure_on_deadlock(self):
|
||||||
|
raise_deadlock = [True, True, True]
|
||||||
|
self.CONF.set_override('max_retries', 3, group='database')
|
||||||
|
|
||||||
|
s = sample.Sample('instance', sample.TYPE_CUMULATIVE, unit='',
|
||||||
|
volume=1, user_id='user_id',
|
||||||
|
project_id='project_id',
|
||||||
|
resource_id='resource_id',
|
||||||
|
timestamp=datetime.datetime.utcnow(),
|
||||||
|
resource_metadata={'display_name': 'test-server',
|
||||||
|
'tag': 'self.counter'},
|
||||||
|
source=None)
|
||||||
|
|
||||||
|
msg = utils.meter_message_from_counter(
|
||||||
|
s, self.CONF.publisher.telemetry_secret
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_resource_create = mock.patch.object(self.conn, "_create_resource")
|
||||||
|
|
||||||
|
mock_resource_create.side_effect = self.create_side_effect(
|
||||||
|
self.conn._create_resource, dbexc.DBDeadlock, raise_deadlock)
|
||||||
|
with mock.patch.object(api.time, 'sleep') as retry_sleep:
|
||||||
|
try:
|
||||||
|
self.conn.record_metering_data(msg)
|
||||||
|
except dbexc.DBError as err:
|
||||||
|
self.assertIn('DBDeadlock', str(type(err)))
|
||||||
|
self.assertEqual(3, retry_sleep.call_count)
|
||||||
|
|
||||||
@tests_db.run_with('sqlite', 'mysql', 'pgsql', 'hbase', 'db2')
|
@tests_db.run_with('sqlite', 'mysql', 'pgsql', 'hbase', 'db2')
|
||||||
def test_clear_metering_data_with_alarms(self):
|
def test_clear_metering_data_with_alarms(self):
|
||||||
# NOTE(jd) Override this test in MongoDB because our code doesn't clear
|
# NOTE(jd) Override this test in MongoDB because our code doesn't clear
|
||||||
@ -3590,22 +3660,14 @@ class MongoAutoReconnectTest(DBTestBase,
|
|||||||
self.assertIsInstance(self.conn.conn.conn,
|
self.assertIsInstance(self.conn.conn.conn,
|
||||||
pymongo.MongoClient)
|
pymongo.MongoClient)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def create_side_effect(method, test_exception):
|
|
||||||
def side_effect(*args, **kwargs):
|
|
||||||
if test_exception.pop():
|
|
||||||
raise pymongo.errors.AutoReconnect
|
|
||||||
else:
|
|
||||||
return method(*args, **kwargs)
|
|
||||||
return side_effect
|
|
||||||
|
|
||||||
def test_mongo_cursor_next(self):
|
def test_mongo_cursor_next(self):
|
||||||
expected_first_sample_timestamp = datetime.datetime(2012, 7, 2, 10, 39)
|
expected_first_sample_timestamp = datetime.datetime(2012, 7, 2, 10, 39)
|
||||||
raise_exc = [False, True]
|
raise_exc = [False, True]
|
||||||
method = self.conn.db.resource.find().cursor.next
|
method = self.conn.db.resource.find().cursor.next
|
||||||
with mock.patch('pymongo.cursor.Cursor.next',
|
with mock.patch('pymongo.cursor.Cursor.next',
|
||||||
mock.Mock()) as mock_next:
|
mock.Mock()) as mock_next:
|
||||||
mock_next.side_effect = self.create_side_effect(method, raise_exc)
|
mock_next.side_effect = self.create_side_effect(
|
||||||
|
method, pymongo.errors.AutoReconnect, raise_exc)
|
||||||
resource = self.conn.db.resource.find().next()
|
resource = self.conn.db.resource.find().next()
|
||||||
self.assertEqual(expected_first_sample_timestamp,
|
self.assertEqual(expected_first_sample_timestamp,
|
||||||
resource['first_sample_timestamp'])
|
resource['first_sample_timestamp'])
|
||||||
@ -3616,8 +3678,8 @@ class MongoAutoReconnectTest(DBTestBase,
|
|||||||
|
|
||||||
with mock.patch('pymongo.collection.Collection.insert',
|
with mock.patch('pymongo.collection.Collection.insert',
|
||||||
mock.Mock(return_value=method)) as mock_insert:
|
mock.Mock(return_value=method)) as mock_insert:
|
||||||
mock_insert.side_effect = self.create_side_effect(method,
|
mock_insert.side_effect = self.create_side_effect(
|
||||||
raise_exc)
|
method, pymongo.errors.AutoReconnect, raise_exc)
|
||||||
mock_insert.__name__ = 'insert'
|
mock_insert.__name__ = 'insert'
|
||||||
self.create_and_store_sample(
|
self.create_and_store_sample(
|
||||||
timestamp=datetime.datetime(2014, 10, 15, 14, 39),
|
timestamp=datetime.datetime(2014, 10, 15, 14, 39),
|
||||||
@ -3631,7 +3693,8 @@ class MongoAutoReconnectTest(DBTestBase,
|
|||||||
|
|
||||||
with mock.patch('pymongo.collection.Collection.find_and_modify',
|
with mock.patch('pymongo.collection.Collection.find_and_modify',
|
||||||
mock.Mock()) as mock_fam:
|
mock.Mock()) as mock_fam:
|
||||||
mock_fam.side_effect = self.create_side_effect(method, raise_exc)
|
mock_fam.side_effect = self.create_side_effect(
|
||||||
|
method, pymongo.errors.AutoReconnect, raise_exc)
|
||||||
mock_fam.__name__ = 'find_and_modify'
|
mock_fam.__name__ = 'find_and_modify'
|
||||||
self.create_and_store_sample(
|
self.create_and_store_sample(
|
||||||
timestamp=datetime.datetime(2014, 10, 15, 14, 39),
|
timestamp=datetime.datetime(2014, 10, 15, 14, 39),
|
||||||
@ -3647,8 +3710,8 @@ class MongoAutoReconnectTest(DBTestBase,
|
|||||||
|
|
||||||
with mock.patch('pymongo.collection.Collection.update',
|
with mock.patch('pymongo.collection.Collection.update',
|
||||||
mock.Mock()) as mock_update:
|
mock.Mock()) as mock_update:
|
||||||
mock_update.side_effect = self.create_side_effect(method,
|
mock_update.side_effect = self.create_side_effect(
|
||||||
raise_exc)
|
method, pymongo.errors.AutoReconnect, raise_exc)
|
||||||
mock_update.__name__ = 'update'
|
mock_update.__name__ = 'update'
|
||||||
self.create_and_store_sample(
|
self.create_and_store_sample(
|
||||||
timestamp=datetime.datetime(2014, 10, 15, 17, 39),
|
timestamp=datetime.datetime(2014, 10, 15, 17, 39),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user