Handle manually mandatory field

wsme have a couple of bugs:
 https://bugs.launchpad.net/wsme/+bug/1227004
 https://bugs.launchpad.net/wsme/+bug/1227039

This patch workaround these bugs by doing input validation manually

Change-Id: Id16655ce4c5546b1caededad70a6b9a238e4be20
This commit is contained in:
Mehdi Abaakouk 2013-09-20 15:08:30 +02:00
parent cd96b61099
commit d1cb53db89
2 changed files with 81 additions and 3 deletions

View File

@ -943,8 +943,19 @@ class AlarmThresholdRule(_Base):
@staticmethod
def validate(threshold_rule):
#note(sileht): wsme mandatory doesn't work as expected
#workaround for https://bugs.launchpad.net/wsme/+bug/1227004
for field in ['meter_name', 'threshold']:
if not getattr(threshold_rule, field):
error = _("threshold_rule/%s is mandatory") % field
pecan.response.translatable_error = error
raise wsme.exc.ClientSideError(unicode(error))
#note(sileht): wsme default doesn't work in some case
#workaround for https://bugs.launchpad.net/wsme/+bug/1227039
if not threshold_rule.query:
threshold_rule.query = []
#note(sileht): _query_to_kwargs implicitly call _sanitize_query
#that add project_id in query
_query_to_kwargs(threshold_rule.query, storage.SampleFilter.__init__,
@ -1007,6 +1018,13 @@ class AlarmCombinationRule(_Base):
@staticmethod
def validate(combination_rule):
#note(sileht): wsme mandatory doesn't works as expected
#workaround for https://bugs.launchpad.net/wsme/+bug/1227004
if not combination_rule.alarm_ids:
error = _("combination_rule/alarm_ids is mandatory")
pecan.response.translatable_error = error
raise wsme.exc.ClientSideError(unicode(error))
for id in combination_rule.alarm_ids:
auth_project = acl.get_limited_to_project(pecan.request.headers)
alarms = list(pecan.request.storage_conn.get_alarms(
@ -1097,11 +1115,19 @@ class Alarm(_Base):
@staticmethod
def validate(alarm):
#note(sileht): wsme mandatory doesn't work as expected
#workaround for https://bugs.launchpad.net/wsme/+bug/1227004
for field in ['name', 'type']:
if not getattr(alarm, field):
error = _("%s is mandatory") % field
pecan.response.translatable_error = error
raise wsme.exc.ClientSideError(unicode(error))
if alarm.threshold_rule and alarm.combination_rule:
error = _("threshold_rule and combination_rule "
"cannot be set at the same time")
pecan.response.translatable_error = error
raise wsme.exc.ClientSideError(error)
raise wsme.exc.ClientSideError(unicode(error))
return alarm
@classmethod

View File

@ -203,6 +203,52 @@ class TestAlarms(FunctionalTest,
self.assertEqual(one['alarm_id'], alarms[0]['alarm_id'])
self.assertEqual(one['repeat_actions'], alarms[0]['repeat_actions'])
def test_post_alarm_wsme_workaround(self):
jsons = {
'type': {
'name': 'missing type',
'threshold_rule': {
'meter_name': 'ameter',
'threshold': 2.0,
}
},
'name': {
'type': 'threshold',
'threshold_rule': {
'meter_name': 'ameter',
'threshold': 2.0,
}
},
'threshold_rule/meter_name': {
'name': 'missing meter_name',
'type': 'threshold',
'threshold_rule': {
'threshold': 2.0,
}
},
'threshold_rule/threshold': {
'name': 'missing threshold',
'type': 'threshold',
'threshold_rule': {
'meter_name': 'ameter',
}
},
'combination_rule/alarm_ids': {
'name': 'missing alarm_ids',
'type': 'combination',
'combination_rule': {}
}
}
for field, json in jsons.iteritems():
resp = self.post_json('/alarms', params=json, expect_errors=True,
status=400, headers=self.auth_headers)
self.assertEqual(
resp.json['error_message'],
'{"debuginfo": null, "faultcode": "Client", "faultstring": '
'"%s is mandatory"}' % field)
alarms = list(self.conn.get_alarms())
self.assertEqual(4, len(alarms))
def test_post_invalid_alarm_period(self):
json = {
'name': 'added_alarm_invalid_period',
@ -261,6 +307,7 @@ class TestAlarms(FunctionalTest,
'name': 'added_alarm',
'type': 'threshold',
'threshold_rule': {
'meter_name': 'ameter',
'query': [{'field': 'meter',
'value': 'ameter'}],
'comparison_operator': 'gt',
@ -271,10 +318,15 @@ class TestAlarms(FunctionalTest,
}
}
self.post_json('/alarms', params=json, expect_errors=True, status=400,
headers=self.auth_headers)
resp = self.post_json('/alarms', params=json, expect_errors=True,
status=400, headers=self.auth_headers)
alarms = list(self.conn.get_alarms())
self.assertEqual(4, len(alarms))
self.assertEqual(
resp.json['error_message'],
'{"debuginfo": null, "faultcode": "Client", "faultstring": '
'"threshold_rule and combination_rule cannot '
'be set at the same time"}')
def test_post_alarm_defaults(self):
to_check = {