Fix 500 error on invalid security-group-rule creation for NVP

Validate that, when creating rules specifying a protocol different
from TCP, UDP, ICMP and ICMPv6, no port (range) is specified because
NVP does not like it.

The extra validation is specific to the NVP plugin because other
plugins may choose a different failure mode. For example, the ovs
plugin does not complain at all, but it just ignores the port range
if the protocol is not a port-oriented one.

Fixes bug #1213293

Change-Id: I46e9032b5cf7f7d88b9d05c3bf020784b04217d9
This commit is contained in:
armando-migliaccio 2013-08-20 16:51:29 -07:00
parent 589071a1fb
commit a8c064ed32
5 changed files with 55 additions and 8 deletions

View File

@ -38,6 +38,7 @@ IPv6 = 'IPv6'
ICMP_PROTOCOL = 1
TCP_PROTOCOL = 6
UDP_PROTOCOL = 17
ICMPv6_PROTOCOL = 58
DHCP_RESPONSE_PORT = 68

View File

@ -32,7 +32,8 @@ from neutron.openstack.common import uuidutils
IP_PROTOCOL_MAP = {'tcp': constants.TCP_PROTOCOL,
'udp': constants.UDP_PROTOCOL,
'icmp': constants.ICMP_PROTOCOL}
'icmp': constants.ICMP_PROTOCOL,
'icmpv6': constants.ICMPv6_PROTOCOL}
class SecurityGroup(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):

View File

@ -2140,6 +2140,21 @@ class NvpPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
return super(NvpPluginV2, self).delete_security_group(
context, security_group_id)
def _validate_security_group_rules(self, context, rules):
for rule in rules['security_group_rules']:
r = rule.get('security_group_rule')
port_based_proto = (self._get_ip_proto_number(r['protocol'])
in securitygroups_db.IP_PROTOCOL_MAP.values())
if (not port_based_proto and
(r['port_range_min'] is not None or
r['port_range_max'] is not None)):
msg = (_("Port values not valid for "
"protocol: %s") % r['protocol'])
raise q_exc.BadRequest(resource='security_group_rule',
msg=msg)
return super(NvpPluginV2, self)._validate_security_group_rules(context,
rules)
def create_security_group_rule(self, context, security_group_rule):
"""Create a single security group rule."""
bulk_rule = {'security_group_rules': [security_group_rule]}

View File

@ -359,6 +359,21 @@ class TestNiciraSecurityGroup(ext_sg.TestSecurityGroups,
# Assert Neutron name is not truncated
self.assertEqual(sg['security_group']['name'], name)
def test_create_security_group_rule_bad_input(self):
name = 'foo security group'
description = 'foo description'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
protocol = 200
min_range = 32
max_range = 4343
rule = self._build_security_group_rule(
security_group_id, 'ingress', protocol,
min_range, max_range)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
class TestNiciraL3NatTestCase(test_l3_plugin.L3NatDBTestCase,
NiciraPluginV2TestCase):

View File

@ -71,20 +71,24 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
context.Context('', kwargs['tenant_id']))
return security_group_req.get_response(self.ext_api)
def _build_security_group_rule(self, security_group_id, direction,
protocol, port_range_min, port_range_max,
def _build_security_group_rule(self, security_group_id, direction, proto,
port_range_min=None, port_range_max=None,
remote_ip_prefix=None, remote_group_id=None,
tenant_id='test_tenant',
ethertype='IPv4'):
data = {'security_group_rule': {'security_group_id': security_group_id,
'direction': direction,
'protocol': protocol,
'protocol': proto,
'ethertype': ethertype,
'port_range_min': port_range_min,
'port_range_max': port_range_max,
'tenant_id': tenant_id,
'ethertype': ethertype}}
if port_range_min:
data['security_group_rule']['port_range_min'] = port_range_min
if port_range_max:
data['security_group_rule']['port_range_max'] = port_range_max
if remote_ip_prefix:
data['security_group_rule']['remote_ip_prefix'] = remote_ip_prefix
@ -408,6 +412,18 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_create_security_group_rule_tcp_protocol_as_number(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
protocol = 6 # TCP
rule = self._build_security_group_rule(
security_group_id, 'ingress', protocol, '22', '22')
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201)
def test_create_security_group_rule_protocol_as_number(self):
name = 'webservers'
description = 'my webservers'
@ -415,8 +431,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id']
protocol = 2
rule = self._build_security_group_rule(
security_group_id, 'ingress', protocol, '22', '22',
None, None)
security_group_id, 'ingress', protocol)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201)