Fix validation error for invalid field name in simple query

The field name validation code runs only in case of the operator that is
specified for the invalid field name, is 'eq'. Otherwise the validation
ends with an invalid operator error. The field name validation code was
modified to check the field name first and only checks the operators for
valid field names.

Closes-bug: #1294628
Change-Id: Id34ae882940b36dec3ea81043b0d29b996a38426
This commit is contained in:
Ildiko Vancsa 2014-03-19 15:31:31 +01:00
parent f2019732bc
commit 50cffbc08f
3 changed files with 103 additions and 18 deletions

View File

@ -378,24 +378,28 @@ def _validate_query(query, db_func, internal_keys=[],
"search_offset cannot be used without " +
"timestamp")
def _is_field_metadata(field):
return (field.startswith('metadata.') or
field.startswith('resource_metadata.'))
for i in query:
if i.field not in ('timestamp', 'search_offset'):
if i.op == 'eq':
if i.field == 'enabled':
key = translation.get(i.field, i.field)
operator = i.op
if (key in valid_keys or _is_field_metadata(i.field)):
if operator == 'eq':
if key == 'enabled':
i._get_value_as_type('boolean')
elif (i.field.startswith('metadata.') or
i.field.startswith('resource_metadata.')):
elif _is_field_metadata(key):
i._get_value_as_type()
else:
key = translation.get(i.field, i.field)
if key not in valid_keys:
raise wsme.exc.InvalidInput('op', i.op,
'unimplemented operator for '
'%s' % i.field)
else:
msg = ("unrecognized field in query: %s, "
"valid keys: %s") % (query, valid_keys)
raise wsme.exc.UnknownArgument(key, msg)
else:
raise wsme.exc.InvalidInput('op', i.op,
'unimplemented operator for %s' %
i.field)
def _validate_timestamp_fields(query, field_name, operator_list,

View File

@ -447,6 +447,30 @@ class TestAlarms(FunctionalTest,
alarms = list(self.conn.get_alarms())
self.assertEqual(4, len(alarms))
def test_post_invalid_alarm_query_field_type(self):
json = {
'name': 'added_alarm',
'type': 'threshold',
'threshold_rule': {
'meter_name': 'ameter',
'query': [{'field': 'metadata.valid',
'op': 'eq',
'value': 'value',
'type': 'blob'}],
'comparison_operator': 'gt',
'threshold': 2.0,
'statistic': 'avg',
}
}
resp = self.post_json('/alarms', params=json, expect_errors=True,
status=400, headers=self.auth_headers)
expected_error_message = 'The data type blob is not supported.'
resp_string = jsonutils.loads(resp.body)
fault_string = resp_string['error_message']['faultstring']
self.assertTrue(fault_string.startswith(expected_error_message))
alarms = list(self.conn.get_alarms())
self.assertEqual(4, len(alarms))
def test_post_invalid_alarm_have_multiple_rules(self):
json = {
'name': 'added_alarm',

View File

@ -189,6 +189,67 @@ class TestListMeters(FunctionalTest,
['faultstring'])
def test_list_samples(self):
data = self.get_json('/samples')
self.assertEqual(5, len(data))
def test_query_samples_with_invalid_field_name_and_non_eq_operator(self):
resp = self.get_json('/samples',
q=[{'field': 'non_valid_field_name',
'op': 'gt',
'value': 3}],
expect_errors=True)
resp_string = jsonutils.loads(resp.body)
fault_string = resp_string['error_message']['faultstring']
expected_error_message = ('Unknown argument: "non_valid_field_name"'
': unrecognized field in query: '
'[<Query u\'non_valid_field_name\' '
'gt u\'3\' None>]')
self.assertEqual(400, resp.status_code)
self.assertTrue(fault_string.startswith(expected_error_message))
def test_query_samples_with_invalid_field_name_and_eq_operator(self):
resp = self.get_json('/samples',
q=[{'field': 'non_valid_field_name',
'op': 'eq',
'value': 3}],
expect_errors=True)
resp_string = jsonutils.loads(resp.body)
fault_string = resp_string['error_message']['faultstring']
expected_error_message = ('Unknown argument: "non_valid_field_name"'
': unrecognized field in query: '
'[<Query u\'non_valid_field_name\' '
'eq u\'3\' None>]')
self.assertEqual(400, resp.status_code)
self.assertTrue(fault_string.startswith(expected_error_message))
def test_query_samples_with_invalid_operator_and_valid_field_name(self):
resp = self.get_json('/samples',
q=[{'field': 'project_id',
'op': 'lt',
'value': '3'}],
expect_errors=True)
resp_string = jsonutils.loads(resp.body)
fault_string = resp_string['error_message']['faultstring']
expected_error_message = ("Invalid input for field/attribute op. " +
"Value: 'lt'. unimplemented operator for" +
" project_id")
self.assertEqual(400, resp.status_code)
self.assertEqual(fault_string, expected_error_message)
def test_list_meters_query_wrong_type_metadata(self):
resp = self.get_json('/meters/meter.test',
q=[{'field': 'metadata.size',
'op': 'eq',
'value': '0',
'type': 'blob'}],
expect_errors=True
)
expected_error_message = 'The data type blob is not supported.'
resp_string = jsonutils.loads(resp.body)
fault_string = resp_string['error_message']['faultstring']
self.assertTrue(fault_string.startswith(expected_error_message))
def test_query_samples_with_search_offset(self):
resp = self.get_json('/samples',
q=[{'field': 'search_offset',
'op': 'eq',
@ -201,10 +262,6 @@ class TestListMeters(FunctionalTest,
jsonutils.loads(resp.body)['error_message']
['faultstring'])
def test_query_samples_with_search_offset(self):
data = self.get_json('/samples')
self.assertEqual(5, len(data))
def test_list_meters_with_dict_metadata(self):
data = self.get_json('/meters/meter.mine',
q=[{'field':