Ensure alarm rule conform to alarm type

Threshold_rule or combination_rule must be set to an Alarm object,
and they cannot be set at the same time.
Alarm's type must be one of threshold and combination.
Threshold_rule and combination_rule should be validated according alarm's
type.

Change-Id: I97bf82defeaf0dc1a0e897e10886fd736361041d
Closes-bug: #1288677
This commit is contained in:
liu-sheng 2014-03-06 19:56:32 +08:00
parent 39357afff1
commit 2c23169109
2 changed files with 72 additions and 13 deletions

View File

@ -1708,17 +1708,8 @@ class Alarm(_Base):
@staticmethod
def validate(alarm):
if (alarm.threshold_rule in (wtypes.Unset, None)
and alarm.combination_rule in (wtypes.Unset, None)):
error = _("either threshold_rule or combination_rule "
"must be set")
raise ClientSideError(error)
if alarm.threshold_rule and alarm.combination_rule:
error = _("threshold_rule and combination_rule "
"cannot be set at the same time")
raise ClientSideError(error)
Alarm.check_rule(alarm)
if alarm.threshold_rule:
# ensure an implicit constraint on project_id is added to
# the query if not already present
@ -1739,6 +1730,18 @@ class Alarm(_Base):
return alarm
@staticmethod
def check_rule(alarm):
rule = '%s_rule' % alarm.type
if getattr(alarm, rule) in (wtypes.Unset, None):
error = _("%(rule)s must be set for %(type)s"
" type alarm") % {"rule": rule, "type": alarm.type}
raise ClientSideError(error)
if alarm.threshold_rule and alarm.combination_rule:
error = _("threshold_rule and combination_rule "
"cannot be set at the same time")
raise ClientSideError(error)
@classmethod
def sample(cls):
return cls(alarm_id=None,

View File

@ -409,9 +409,8 @@ class TestAlarms(FunctionalTest,
resp = self.post_json('/alarms', params=json, expect_errors=True,
status=400, headers=self.auth_headers)
self.assertEqual(
resp.json['error_message']['faultstring'],
"either threshold_rule or combination_rule "
"must be set")
"threshold_rule must be set for threshold type alarm",
resp.json['error_message']['faultstring'])
def test_post_invalid_alarm_statistic(self):
json = {
@ -930,6 +929,63 @@ class TestAlarms(FunctionalTest,
def test_post_combination_alarm_as_admin_success_owner_set(self):
self._do_post_combination_alarm_as_admin_success(True)
def test_post_combination_alarm_with_threshold_rule(self):
"""Test the creation of an combination alarm with threshold rule.
"""
json = {
'enabled': False,
'name': 'added_alarm',
'state': 'ok',
'type': 'combination',
'ok_actions': ['http://something/ok'],
'alarm_actions': ['http://something/alarm'],
'insufficient_data_actions': ['http://something/no'],
'repeat_actions': True,
'threshold_rule': {
'meter_name': 'ameter',
'query': [{'field': 'metadata.field',
'op': 'eq',
'value': '5',
'type': 'string'}],
'comparison_operator': 'le',
'statistic': 'count',
'threshold': 50,
'evaluation_periods': '3',
'period': '180',
}
}
resp = self.post_json('/alarms', params=json,
expect_errors=True, status=400,
headers=self.auth_headers)
self.assertEqual(
"combination_rule must be set for combination type alarm",
resp.json['error_message']['faultstring'])
def test_post_threshold_alarm_with_combination_rule(self):
"""Test the creation of an threshold alarm with combination rule.
"""
json = {
'enabled': False,
'name': 'added_alarm',
'state': 'ok',
'type': 'threshold',
'ok_actions': ['http://something/ok'],
'alarm_actions': ['http://something/alarm'],
'insufficient_data_actions': ['http://something/no'],
'repeat_actions': True,
'combination_rule': {
'alarm_ids': ['a',
'b'],
'operator': 'and',
}
}
resp = self.post_json('/alarms', params=json,
expect_errors=True, status=400,
headers=self.auth_headers)
self.assertEqual(
"threshold_rule must be set for threshold type alarm",
resp.json['error_message']['faultstring'])
def _do_post_combination_alarm_as_admin_success(self, owner_is_set):
"""Test that post a combination alarm as admin on behalf of nobody
with an alarm_id of someone else, with owner set or not