Merge "Refactor alarm scenario tests (TestAlarmsBase)"

This commit is contained in:
Jenkins 2015-07-16 10:20:42 +00:00 committed by Gerrit Code Review
commit 233e76afae

View File

@ -32,21 +32,85 @@ from aodh.tests import constants
from aodh.tests import db as tests_db
class TestListEmptyAlarms(v2.FunctionalTest,
class TestAlarmsBase(v2.FunctionalTest,
tests_db.MixinTestsWithBackendScenarios):
def setUp(self):
super(TestAlarmsBase, self).setUp()
self.auth_headers = {'X-User-Id': str(uuid.uuid4()),
'X-Project-Id': str(uuid.uuid4())}
@staticmethod
def _add_default_threshold_rule(alarm):
if (alarm['type'] == 'threshold' and
'exclude_outliers' not in alarm['threshold_rule']):
alarm['threshold_rule']['exclude_outliers'] = False
def _verify_alarm(self, json, alarm, expected_name=None):
if expected_name and alarm.name != expected_name:
self.fail("Alarm not found")
self._add_default_threshold_rule(json)
for key in json:
if key.endswith('_rule'):
storage_key = 'rule'
else:
storage_key = key
self.assertEqual(json[key], getattr(alarm, storage_key))
def _get_alarm(self, id):
data = self.get_json('/alarms')
match = [a for a in data if a['alarm_id'] == id]
self.assertEqual(1, len(match), 'alarm %s not found' % id)
return match[0]
def _get_alarm_history(self, alarm, auth_headers=None, query=None,
expect_errors=False, status=200):
url = '/alarms/%s/history' % alarm['alarm_id']
if query:
url += '?q.op=%(op)s&q.value=%(value)s&q.field=%(field)s' % query
resp = self.get_json(url,
headers=auth_headers or self.auth_headers,
expect_errors=expect_errors)
if expect_errors:
self.assertEqual(status, resp.status_code)
return resp
def _update_alarm(self, alarm, updated_data, auth_headers=None):
data = self._get_alarm(alarm['alarm_id'])
data.update(updated_data)
self.put_json('/alarms/%s' % alarm['alarm_id'],
params=data,
headers=auth_headers or self.auth_headers)
def _delete_alarm(self, alarm, auth_headers=None):
self.delete('/alarms/%s' % alarm['alarm_id'],
headers=auth_headers or self.auth_headers,
status=204)
def _assert_is_subset(self, expected, actual):
for k, v in six.iteritems(expected):
self.assertEqual(v, actual.get(k), 'mismatched field: %s' % k)
self.assertIsNotNone(actual['event_id'])
def _assert_in_json(self, expected, actual):
actual = jsonutils.dumps(jsonutils.loads(actual), sort_keys=True)
for k, v in six.iteritems(expected):
fragment = jsonutils.dumps({k: v}, sort_keys=True)[1:-1]
self.assertIn(fragment, actual,
'%s not in %s' % (fragment, actual))
class TestListEmptyAlarms(TestAlarmsBase):
def test_empty(self):
data = self.get_json('/alarms')
self.assertEqual([], data)
class TestAlarms(v2.FunctionalTest,
tests_db.MixinTestsWithBackendScenarios):
class TestAlarms(TestAlarmsBase):
def setUp(self):
super(TestAlarms, self).setUp()
self.auth_headers = {'X-User-Id': str(uuid.uuid4()),
'X-Project-Id': str(uuid.uuid4())}
for alarm in [
models.Alarm(name='name1',
type='threshold',
@ -234,23 +298,6 @@ class TestAlarms(v2.FunctionalTest,
self.alarm_conn.update_alarm(alarm)
@staticmethod
def _add_default_threshold_rule(alarm):
if (alarm['type'] == 'threshold' and
'exclude_outliers' not in alarm['threshold_rule']):
alarm['threshold_rule']['exclude_outliers'] = False
def _verify_alarm(self, json, alarm, expected_name=None):
if expected_name and alarm.name != expected_name:
self.fail("Alarm not found")
self._add_default_threshold_rule(json)
for key in json:
if key.endswith('_rule'):
storage_key = 'rule'
else:
storage_key = key
self.assertEqual(json[key], getattr(alarm, storage_key))
def test_list_alarms(self):
data = self.get_json('/alarms')
self.assertEqual(7, len(data))
@ -2218,48 +2265,6 @@ class TestAlarms(v2.FunctionalTest,
params='not valid',
status=400)
def _get_alarm(self, id):
data = self.get_json('/alarms')
match = [a for a in data if a['alarm_id'] == id]
self.assertEqual(1, len(match), 'alarm %s not found' % id)
return match[0]
def _get_alarm_history(self, alarm, auth_headers=None, query=None,
expect_errors=False, status=200):
url = '/alarms/%s/history' % alarm['alarm_id']
if query:
url += '?q.op=%(op)s&q.value=%(value)s&q.field=%(field)s' % query
resp = self.get_json(url,
headers=auth_headers or self.auth_headers,
expect_errors=expect_errors)
if expect_errors:
self.assertEqual(status, resp.status_code)
return resp
def _update_alarm(self, alarm, updated_data, auth_headers=None):
data = self._get_alarm(alarm['alarm_id'])
data.update(updated_data)
self.put_json('/alarms/%s' % alarm['alarm_id'],
params=data,
headers=auth_headers or self.auth_headers)
def _delete_alarm(self, alarm, auth_headers=None):
self.delete('/alarms/%s' % alarm['alarm_id'],
headers=auth_headers or self.auth_headers,
status=204)
def _assert_is_subset(self, expected, actual):
for k, v in six.iteritems(expected):
self.assertEqual(v, actual.get(k), 'mismatched field: %s' % k)
self.assertIsNotNone(actual['event_id'])
def _assert_in_json(self, expected, actual):
actual = jsonutils.dumps(jsonutils.loads(actual), sort_keys=True)
for k, v in six.iteritems(expected):
fragment = jsonutils.dumps({k: v}, sort_keys=True)[1:-1]
self.assertIn(fragment, actual,
'%s not in %s' % (fragment, actual))
def test_record_alarm_history_config(self):
self.CONF.set_override('record_history', False, group='alarm')
alarm = self._get_alarm('a')
@ -2792,14 +2797,7 @@ class TestAlarms(v2.FunctionalTest,
self._verify_alarm(json, alarms[0])
class TestAlarmsQuotas(v2.FunctionalTest,
tests_db.MixinTestsWithBackendScenarios):
def setUp(self):
super(TestAlarmsQuotas, self).setUp()
self.auth_headers = {'X-User-Id': str(uuid.uuid4()),
'X-Project-Id': str(uuid.uuid4())}
class TestAlarmsQuotas(TestAlarmsBase):
def _test_alarm_quota(self):
alarm = {