convert query string according to attr map.

Bug #1035774

Change-Id: I162239ab2baa82636d270689ed3acd0cb6c336d4
This commit is contained in:
Yong Sheng Gong 2012-08-12 10:43:09 +08:00
parent 80f35da022
commit f8c067e810
2 changed files with 67 additions and 10 deletions

View File

@ -54,14 +54,14 @@ def _get_hostname():
cfg.CONF.register_opt(cfg.StrOpt('host', default=_get_hostname())) cfg.CONF.register_opt(cfg.StrOpt('host', default=_get_hostname()))
def fields(request): def _fields(request):
""" """
Extracts the list of fields to return Extracts the list of fields to return
""" """
return [v for v in request.GET.getall('fields') if v] return [v for v in request.GET.getall('fields') if v]
def filters(request): def _filters(request, attr_info):
""" """
Extracts the filters from the request string Extracts the filters from the request string
@ -77,14 +77,27 @@ def filters(request):
for key in set(request.GET): for key in set(request.GET):
if key in ('verbose', 'fields'): if key in ('verbose', 'fields'):
continue continue
values = [v for v in request.GET.getall(key) if v] values = [v for v in request.GET.getall(key) if v]
if values: if not attr_info.get(key) and values:
res[key] = values res[key] = values
continue
result_values = []
convert_to = (attr_info.get(key) and attr_info[key].get('convert_to')
or None)
for value in values:
if convert_to:
try:
result_values.append(convert_to(value))
except exceptions.InvalidInput as e:
raise webob.exc.HTTPUnprocessableEntity(str(e))
else:
result_values.append(value)
if result_values:
res[key] = result_values
return res return res
def verbose(request): def _verbose(request):
""" """
Determines the verbose fields for a request Determines the verbose fields for a request
@ -160,9 +173,9 @@ class Controller(object):
# NOTE(salvatore-orlando): The following ensures that fields which # NOTE(salvatore-orlando): The following ensures that fields which
# are needed for authZ policy validation are not stripped away by the # are needed for authZ policy validation are not stripped away by the
# plugin before returning. # plugin before returning.
original_fields, fields_to_add = self._do_field_list(fields(request)) original_fields, fields_to_add = self._do_field_list(_fields(request))
kwargs = {'filters': filters(request), kwargs = {'filters': _filters(request, self._attr_info),
'verbose': verbose(request), 'verbose': _verbose(request),
'fields': original_fields} 'fields': original_fields}
obj_getter = getattr(self._plugin, "get_%s" % self._collection) obj_getter = getattr(self._plugin, "get_%s" % self._collection)
obj_list = obj_getter(request.context, **kwargs) obj_list = obj_getter(request.context, **kwargs)
@ -183,7 +196,7 @@ class Controller(object):
def _item(self, request, id, do_authz=False, field_list=None): def _item(self, request, id, do_authz=False, field_list=None):
"""Retrieves and formats a single element of the requested entity""" """Retrieves and formats a single element of the requested entity"""
kwargs = {'verbose': verbose(request), kwargs = {'verbose': _verbose(request),
'fields': field_list} 'fields': field_list}
action = "get_%s" % self._resource action = "get_%s" % self._resource
obj_getter = getattr(self._plugin, action) obj_getter = getattr(self._plugin, action)
@ -205,7 +218,7 @@ class Controller(object):
# NOTE(salvatore-orlando): The following ensures that fields # NOTE(salvatore-orlando): The following ensures that fields
# which are needed for authZ policy validation are not stripped # which are needed for authZ policy validation are not stripped
# away by the plugin before returning. # away by the plugin before returning.
field_list, added_fields = self._do_field_list(fields(request)) field_list, added_fields = self._do_field_list(_fields(request))
return {self._resource: return {self._resource:
self._view(self._item(request, self._view(self._item(request,
id, id,

View File

@ -1313,6 +1313,30 @@ class TestNetworksV2(QuantumDbPluginV2TestCase):
self.assertEquals(res['networks'][1]['name'], self.assertEquals(res['networks'][1]['name'],
net2['network']['name']) net2['network']['name'])
def test_list_networks_with_parameters(self):
with self.network(name='net1', admin_status_up=False) as net1:
with self.network(name='net2') as net2:
req = self.new_list_request('networks',
params='admin_state_up=False')
res = self.deserialize('json', req.get_response(self.api))
self.assertEquals(1, len(res['networks']))
self.assertEquals(res['networks'][0]['name'],
net1['network']['name'])
req = self.new_list_request('networks',
params='admin_state_up=true')
res = self.deserialize('json', req.get_response(self.api))
self.assertEquals(1, len(res['networks']))
self.assertEquals(res['networks'][0]['name'],
net2['network']['name'])
def test_list_networks_with_parameters_invalid_values(self):
with self.network(name='net1', admin_status_up=False) as net1:
with self.network(name='net2') as net2:
req = self.new_list_request('networks',
params='admin_state_up=fake')
res = req.get_response(self.api)
self.assertEquals(422, res.status_int)
def test_show_network(self): def test_show_network(self):
with self.network(name='net1') as net: with self.network(name='net1') as net:
req = self.new_show_request('networks', net['network']['id']) req = self.new_show_request('networks', net['network']['id'])
@ -1672,6 +1696,26 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
self.assertEquals(res2['cidr'], self.assertEquals(res2['cidr'],
subnet2['subnet']['cidr']) subnet2['subnet']['cidr'])
def test_list_subnets_with_parameter(self):
# NOTE(jkoelker) This would be a good place to use contextlib.nested
# or just drop 2.6 support ;)
with self.network() as network:
with self.subnet(network=network, gateway_ip='10.0.0.1',
cidr='10.0.0.0/24') as subnet:
with self.subnet(network=network, gateway_ip='10.0.1.1',
cidr='10.0.1.0/24') as subnet2:
req = self.new_list_request(
'subnets',
params='ip_version=4&ip_version=6')
res = self.deserialize('json',
req.get_response(self.api))
self.assertEquals(2, len(res['subnets']))
req = self.new_list_request('subnets',
params='ip_version=6')
res = self.deserialize('json',
req.get_response(self.api))
self.assertEquals(0, len(res['subnets']))
def test_invalid_ip_version(self): def test_invalid_ip_version(self):
with self.network() as network: with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'], data = {'subnet': {'network_id': network['network']['id'],