Merge "Change hard coded numbers to constants in security group tests"

This commit is contained in:
Jenkins 2013-09-26 09:15:38 +00:00 committed by Gerrit Code Review
commit f220b39d57
7 changed files with 223 additions and 189 deletions

View File

@ -35,11 +35,6 @@ METERING_LABEL_KEY = '_metering_labels'
IPv4 = 'IPv4' IPv4 = 'IPv4'
IPv6 = 'IPv6' IPv6 = 'IPv6'
ICMP_PROTOCOL = 1
TCP_PROTOCOL = 6
UDP_PROTOCOL = 17
ICMPv6_PROTOCOL = 58
DHCP_RESPONSE_PORT = 68 DHCP_RESPONSE_PORT = 68
MIN_VLAN_TAG = 1 MIN_VLAN_TAG = 1
@ -87,3 +82,13 @@ PORT_BINDING_EXT_ALIAS = 'binding'
L3_AGENT_SCHEDULER_EXT_ALIAS = 'l3_agent_scheduler' L3_AGENT_SCHEDULER_EXT_ALIAS = 'l3_agent_scheduler'
DHCP_AGENT_SCHEDULER_EXT_ALIAS = 'dhcp_agent_scheduler' DHCP_AGENT_SCHEDULER_EXT_ALIAS = 'dhcp_agent_scheduler'
LBAAS_AGENT_SCHEDULER_EXT_ALIAS = 'lbaas_agent_scheduler' LBAAS_AGENT_SCHEDULER_EXT_ALIAS = 'lbaas_agent_scheduler'
# Protocol names and numbers for Security Groups/Firewalls
PROTO_NAME_TCP = 'tcp'
PROTO_NAME_ICMP = 'icmp'
PROTO_NAME_ICMP_V6 = 'icmpv6'
PROTO_NAME_UDP = 'udp'
PROTO_NUM_TCP = 6
PROTO_NUM_ICMP = 1
PROTO_NUM_ICMP_V6 = 58
PROTO_NUM_UDP = 17

View File

@ -30,10 +30,10 @@ from neutron.extensions import securitygroup as ext_sg
from neutron.openstack.common import uuidutils from neutron.openstack.common import uuidutils
IP_PROTOCOL_MAP = {'tcp': constants.TCP_PROTOCOL, IP_PROTOCOL_MAP = {constants.PROTO_NAME_TCP: constants.PROTO_NUM_TCP,
'udp': constants.UDP_PROTOCOL, constants.PROTO_NAME_UDP: constants.PROTO_NUM_UDP,
'icmp': constants.ICMP_PROTOCOL, constants.PROTO_NAME_ICMP: constants.PROTO_NUM_ICMP,
'icmpv6': constants.ICMPv6_PROTOCOL} constants.PROTO_NAME_ICMP_V6: constants.PROTO_NUM_ICMP_V6}
class SecurityGroup(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): class SecurityGroup(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
@ -304,13 +304,13 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
if not rule['protocol']: if not rule['protocol']:
raise ext_sg.SecurityGroupProtocolRequiredWithPorts() raise ext_sg.SecurityGroupProtocolRequiredWithPorts()
ip_proto = self._get_ip_proto_number(rule['protocol']) ip_proto = self._get_ip_proto_number(rule['protocol'])
if ip_proto in [constants.TCP_PROTOCOL, constants.UDP_PROTOCOL]: if ip_proto in [constants.PROTO_NUM_TCP, constants.PROTO_NUM_UDP]:
if (rule['port_range_min'] is not None and if (rule['port_range_min'] is not None and
rule['port_range_min'] <= rule['port_range_max']): rule['port_range_min'] <= rule['port_range_max']):
pass pass
else: else:
raise ext_sg.SecurityGroupInvalidPortRange() raise ext_sg.SecurityGroupInvalidPortRange()
elif ip_proto == constants.ICMP_PROTOCOL: elif ip_proto == constants.PROTO_NUM_ICMP:
for attr, field in [('port_range_min', 'type'), for attr, field in [('port_range_min', 'type'),
('port_range_max', 'code')]: ('port_range_max', 'code')]:
if rule[attr] > 255: if rule[attr] > 255:

View File

@ -23,6 +23,7 @@ from oslo.config import cfg
from neutron.api import extensions from neutron.api import extensions
from neutron.api.v2 import attributes as attr from neutron.api.v2 import attributes as attr
from neutron.api.v2 import base from neutron.api.v2 import base
from neutron.common import constants as const
from neutron.common import exceptions as qexception from neutron.common import exceptions as qexception
from neutron import manager from neutron import manager
from neutron.openstack.common import uuidutils from neutron.openstack.common import uuidutils
@ -158,7 +159,8 @@ def _validate_name_not_default(data, valid_values=None):
attr.validators['type:name_not_default'] = _validate_name_not_default attr.validators['type:name_not_default'] = _validate_name_not_default
sg_supported_protocols = [None, 'tcp', 'udp', 'icmp'] sg_supported_protocols = [None, const.PROTO_NAME_TCP,
const.PROTO_NAME_UDP, const.PROTO_NAME_ICMP]
sg_supported_ethertypes = ['IPv4', 'IPv6'] sg_supported_ethertypes = ['IPv4', 'IPv6']
# Attribute Map # Attribute Map

View File

@ -61,8 +61,8 @@ def get_protocol_value(protocol):
return protocol return protocol
mapping = { mapping = {
'tcp': constants.TCP_PROTOCOL, constants.PROTO_NAME_TCP: constants.PROTO_NUM_TCP,
'udp': constants.UDP_PROTOCOL, constants.PROTO_NAME_UDP: constants.PROTO_NUM_UDP,
'icmp': constants.ICMP_PROTOCOL constants.PROTO_NAME_ICMP: constants.PROTO_NUM_ICMP
} }
return mapping.get(protocol.lower()) return mapping.get(protocol.lower())

View File

@ -1073,7 +1073,7 @@ def create_security_profile(cluster, tenant_id, security_profile):
# Allow all dhcp responses and all ingress traffic # Allow all dhcp responses and all ingress traffic
hidden_rules = {'logical_port_egress_rules': hidden_rules = {'logical_port_egress_rules':
[{'ethertype': 'IPv4', [{'ethertype': 'IPv4',
'protocol': constants.UDP_PROTOCOL, 'protocol': constants.PROTO_NUM_UDP,
'port_range_min': constants.DHCP_RESPONSE_PORT, 'port_range_min': constants.DHCP_RESPONSE_PORT,
'port_range_max': constants.DHCP_RESPONSE_PORT, 'port_range_max': constants.DHCP_RESPONSE_PORT,
'ip_prefix': '0.0.0.0/0'}], 'ip_prefix': '0.0.0.0/0'}],
@ -1111,7 +1111,7 @@ def update_security_group_rules(cluster, spid, rules):
# Allow all dhcp responses in # Allow all dhcp responses in
rules['logical_port_egress_rules'].append( rules['logical_port_egress_rules'].append(
{'ethertype': 'IPv4', 'protocol': constants.UDP_PROTOCOL, {'ethertype': 'IPv4', 'protocol': constants.PROTO_NUM_UDP,
'port_range_min': constants.DHCP_RESPONSE_PORT, 'port_range_min': constants.DHCP_RESPONSE_PORT,
'port_range_max': constants.DHCP_RESPONSE_PORT, 'port_range_max': constants.DHCP_RESPONSE_PORT,
'ip_prefix': '0.0.0.0/0'}) 'ip_prefix': '0.0.0.0/0'})

View File

@ -20,6 +20,7 @@ import mock
import webob.exc import webob.exc
from neutron.api.v2 import attributes as attr from neutron.api.v2 import attributes as attr
from neutron.common import constants as const
from neutron.common.test_lib import test_config from neutron.common.test_lib import test_config
from neutron import context from neutron import context
from neutron.db import db_base_plugin_v2 from neutron.db import db_base_plugin_v2
@ -75,7 +76,7 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
port_range_min=None, port_range_max=None, port_range_min=None, port_range_max=None,
remote_ip_prefix=None, remote_group_id=None, remote_ip_prefix=None, remote_group_id=None,
tenant_id='test_tenant', tenant_id='test_tenant',
ethertype='IPv4'): ethertype=const.IPv4):
data = {'security_group_rule': {'security_group_id': security_group_id, data = {'security_group_rule': {'security_group_id': security_group_id,
'direction': direction, 'direction': direction,
@ -110,13 +111,13 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
def _make_security_group(self, fmt, name, description, **kwargs): def _make_security_group(self, fmt, name, description, **kwargs):
res = self._create_security_group(fmt, name, description, **kwargs) res = self._create_security_group(fmt, name, description, **kwargs)
if res.status_int >= 400: if res.status_int >= webob.exc.HTTPBadRequest.code:
raise webob.exc.HTTPClientError(code=res.status_int) raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res) return self.deserialize(fmt, res)
def _make_security_group_rule(self, fmt, rules, **kwargs): def _make_security_group_rule(self, fmt, rules, **kwargs):
res = self._create_security_group_rule(self.fmt, rules) res = self._create_security_group_rule(self.fmt, rules)
if res.status_int >= 400: if res.status_int >= webob.exc.HTTPBadRequest.code:
raise webob.exc.HTTPClientError(code=res.status_int) raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res) return self.deserialize(fmt, res)
@ -136,10 +137,10 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
@contextlib.contextmanager @contextlib.contextmanager
def security_group_rule(self, security_group_id='4cd70774-cc67-4a87-9b39-7' def security_group_rule(self, security_group_id='4cd70774-cc67-4a87-9b39-7'
'd1db38eb087', 'd1db38eb087',
direction='ingress', protocol='tcp', direction='ingress', protocol=const.PROTO_NAME_TCP,
port_range_min='22', port_range_max='22', port_range_min='22', port_range_max='22',
remote_ip_prefix=None, remote_group_id=None, remote_ip_prefix=None, remote_group_id=None,
fmt=None, no_delete=False, ethertype='IPv4'): fmt=None, no_delete=False, ethertype=const.IPv4):
if not fmt: if not fmt:
fmt = self.fmt fmt = self.fmt
rule = self._build_security_group_rule(security_group_id, rule = self._build_security_group_rule(security_group_id,
@ -263,11 +264,11 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
sg_rules = security_group['security_group']['security_group_rules'] sg_rules = security_group['security_group']['security_group_rules']
self.assertEqual(len(sg_rules), 2) self.assertEqual(len(sg_rules), 2)
v4_rules = filter(lambda x: x['ethertype'] == 'IPv4', sg_rules) v4_rules = [r for r in sg_rules if r['ethertype'] == const.IPv4]
self.assertEqual(len(v4_rules), 1) self.assertEqual(len(v4_rules), 1)
v4_rule = v4_rules[0] v4_rule = v4_rules[0]
expected = {'direction': 'egress', expected = {'direction': 'egress',
'ethertype': 'IPv4', 'ethertype': const.IPv4,
'remote_group_id': None, 'remote_group_id': None,
'remote_ip_prefix': None, 'remote_ip_prefix': None,
'protocol': None, 'protocol': None,
@ -275,11 +276,11 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
'port_range_min': None} 'port_range_min': None}
self._assert_sg_rule_has_kvs(v4_rule, expected) self._assert_sg_rule_has_kvs(v4_rule, expected)
v6_rules = filter(lambda x: x['ethertype'] == 'IPv6', sg_rules) v6_rules = [r for r in sg_rules if r['ethertype'] == const.IPv6]
self.assertEqual(len(v6_rules), 1) self.assertEqual(len(v6_rules), 1)
v6_rule = v6_rules[0] v6_rule = v6_rules[0]
expected = {'direction': 'egress', expected = {'direction': 'egress',
'ethertype': 'IPv6', 'ethertype': const.IPv6,
'remote_group_id': None, 'remote_group_id': None,
'remote_ip_prefix': None, 'remote_ip_prefix': None,
'protocol': None, 'protocol': None,
@ -309,7 +310,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
sg['security_group']['id']) sg['security_group']['id'])
req.environ['neutron.context'] = context.Context('', 'somebody') req.environ['neutron.context'] = context.Context('', 'somebody')
res = req.get_response(self.ext_api) res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 409) self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_update_default_security_group_name_fail(self): def test_update_default_security_group_name_fail(self):
with self.network(): with self.network():
@ -322,7 +323,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
sg['security_groups'][0]['id']) sg['security_groups'][0]['id'])
req.environ['neutron.context'] = context.Context('', 'somebody') req.environ['neutron.context'] = context.Context('', 'somebody')
res = req.get_response(self.ext_api) res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 404) self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_update_default_security_group_with_description(self): def test_update_default_security_group_with_description(self):
with self.network(): with self.network():
@ -347,7 +348,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
description = 'my webservers' description = 'my webservers'
res = self._create_security_group(self.fmt, name, description) res = self._create_security_group(self.fmt, name, description)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409) self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_list_security_groups(self): def test_list_security_groups(self):
with contextlib.nested(self.security_group(name='sg1', with contextlib.nested(self.security_group(name='sg1',
@ -406,23 +407,23 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id'] security_group_id = sg['security_group']['id']
ethertype = 2 ethertype = 2
rule = self._build_security_group_rule( rule = self._build_security_group_rule(
security_group_id, 'ingress', 'tcp', '22', '22', None, None, security_group_id, 'ingress', const.PROTO_NAME_TCP, '22',
ethertype=ethertype) '22', None, None, ethertype=ethertype)
res = self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_tcp_protocol_as_number(self): def test_create_security_group_rule_tcp_protocol_as_number(self):
name = 'webservers' name = 'webservers'
description = 'my webservers' description = 'my webservers'
with self.security_group(name, description) as sg: with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id'] security_group_id = sg['security_group']['id']
protocol = 6 # TCP protocol = const.PROTO_NUM_TCP # TCP
rule = self._build_security_group_rule( rule = self._build_security_group_rule(
security_group_id, 'ingress', protocol, '22', '22') security_group_id, 'ingress', protocol, '22', '22')
res = self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201) self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
def test_create_security_group_rule_protocol_as_number(self): def test_create_security_group_rule_protocol_as_number(self):
name = 'webservers' name = 'webservers'
@ -434,7 +435,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id, 'ingress', protocol) security_group_id, 'ingress', protocol)
res = self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201) self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
def test_create_security_group_rule_case_insensitive(self): def test_create_security_group_rule_case_insensitive(self):
name = 'webservers' name = 'webservers'
@ -457,7 +458,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self.assertEqual(rule['security_group_rule']['protocol'], self.assertEqual(rule['security_group_rule']['protocol'],
protocol.lower()) protocol.lower())
self.assertEqual(rule['security_group_rule']['ethertype'], self.assertEqual(rule['security_group_rule']['ethertype'],
'IPv4') const.IPv4)
def test_get_security_group(self): def test_get_security_group(self):
name = 'webservers' name = 'webservers'
@ -468,7 +469,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id'] security_group_id = sg['security_group']['id']
direction = "ingress" direction = "ingress"
remote_ip_prefix = "10.0.0.0/24" remote_ip_prefix = "10.0.0.0/24"
protocol = 'tcp' protocol = const.PROTO_NAME_TCP
port_range_min = 22 port_range_min = 22
port_range_max = 22 port_range_max = 22
keys = [('remote_ip_prefix', remote_ip_prefix), keys = [('remote_ip_prefix', remote_ip_prefix),
@ -488,8 +489,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self.assertEqual(group['security_group']['id'], self.assertEqual(group['security_group']['id'],
remote_group_id) remote_group_id)
self.assertEqual(len(sg_rule), 3) self.assertEqual(len(sg_rule), 3)
sg_rule = filter(lambda x: x['direction'] == 'ingress', sg_rule = [r for r in sg_rule if r['direction'] == 'ingress']
sg_rule)
for k, v, in keys: for k, v, in keys:
self.assertEqual(sg_rule[0][k], v) self.assertEqual(sg_rule[0][k], v)
@ -498,14 +498,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
description = 'my webservers' description = 'my webservers'
with self.security_group(name, description, no_delete=True) as sg: with self.security_group(name, description, no_delete=True) as sg:
remote_group_id = sg['security_group']['id'] remote_group_id = sg['security_group']['id']
self._delete('security-groups', remote_group_id, 204) self._delete('security-groups', remote_group_id,
webob.exc.HTTPNoContent.code)
def test_delete_default_security_group_admin(self): def test_delete_default_security_group_admin(self):
with self.network(): with self.network():
res = self.new_list_request('security-groups') res = self.new_list_request('security-groups')
sg = self.deserialize(self.fmt, res.get_response(self.ext_api)) sg = self.deserialize(self.fmt, res.get_response(self.ext_api))
self._delete('security-groups', sg['security_groups'][0]['id'], self._delete('security-groups', sg['security_groups'][0]['id'],
204) webob.exc.HTTPNoContent.code)
def test_delete_default_security_group_nonadmin(self): def test_delete_default_security_group_nonadmin(self):
with self.network(): with self.network():
@ -513,7 +514,8 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
sg = self.deserialize(self.fmt, res.get_response(self.ext_api)) sg = self.deserialize(self.fmt, res.get_response(self.ext_api))
neutron_context = context.Context('', 'test-tenant') neutron_context = context.Context('', 'test-tenant')
self._delete('security-groups', sg['security_groups'][0]['id'], self._delete('security-groups', sg['security_groups'][0]['id'],
409, neutron_context=neutron_context) webob.exc.HTTPConflict.code,
neutron_context=neutron_context)
def test_security_group_list_creates_default_security_group(self): def test_security_group_list_creates_default_security_group(self):
neutron_context = context.Context('', 'test-tenant') neutron_context = context.Context('', 'test-tenant')
@ -533,15 +535,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
# Verify default rule for v4 egress # Verify default rule for v4 egress
sg_rules = rules['security_group_rules'] sg_rules = rules['security_group_rules']
rules = filter( rules = [
lambda x: ( r for r in sg_rules
x['direction'] == 'egress' and x['ethertype'] == 'IPv4'), if r['direction'] == 'egress' and r['ethertype'] == const.IPv4
sg_rules) ]
self.assertEqual(len(rules), 1) self.assertEqual(len(rules), 1)
v4_egress = rules[0] v4_egress = rules[0]
expected = {'direction': 'egress', expected = {'direction': 'egress',
'ethertype': 'IPv4', 'ethertype': const.IPv4,
'remote_group_id': None, 'remote_group_id': None,
'remote_ip_prefix': None, 'remote_ip_prefix': None,
'protocol': None, 'protocol': None,
@ -550,15 +552,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self._assert_sg_rule_has_kvs(v4_egress, expected) self._assert_sg_rule_has_kvs(v4_egress, expected)
# Verify default rule for v6 egress # Verify default rule for v6 egress
rules = filter( rules = [
lambda x: ( r for r in sg_rules
x['direction'] == 'egress' and x['ethertype'] == 'IPv6'), if r['direction'] == 'egress' and r['ethertype'] == const.IPv6
sg_rules) ]
self.assertEqual(len(rules), 1) self.assertEqual(len(rules), 1)
v6_egress = rules[0] v6_egress = rules[0]
expected = {'direction': 'egress', expected = {'direction': 'egress',
'ethertype': 'IPv6', 'ethertype': const.IPv6,
'remote_group_id': None, 'remote_group_id': None,
'remote_ip_prefix': None, 'remote_ip_prefix': None,
'protocol': None, 'protocol': None,
@ -567,15 +569,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self._assert_sg_rule_has_kvs(v6_egress, expected) self._assert_sg_rule_has_kvs(v6_egress, expected)
# Verify default rule for v4 ingress # Verify default rule for v4 ingress
rules = filter( rules = [
lambda x: ( r for r in sg_rules
x['direction'] == 'ingress' and x['ethertype'] == 'IPv4'), if r['direction'] == 'ingress' and r['ethertype'] == const.IPv4
sg_rules) ]
self.assertEqual(len(rules), 1) self.assertEqual(len(rules), 1)
v4_ingress = rules[0] v4_ingress = rules[0]
expected = {'direction': 'ingress', expected = {'direction': 'ingress',
'ethertype': 'IPv4', 'ethertype': const.IPv4,
'remote_group_id': security_group_id, 'remote_group_id': security_group_id,
'remote_ip_prefix': None, 'remote_ip_prefix': None,
'protocol': None, 'protocol': None,
@ -584,15 +586,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self._assert_sg_rule_has_kvs(v4_ingress, expected) self._assert_sg_rule_has_kvs(v4_ingress, expected)
# Verify default rule for v6 ingress # Verify default rule for v6 ingress
rules = filter( rules = [
lambda x: ( r for r in sg_rules
x['direction'] == 'ingress' and x['ethertype'] == 'IPv6'), if r['direction'] == 'ingress' and r['ethertype'] == const.IPv6
sg_rules) ]
self.assertEqual(len(rules), 1) self.assertEqual(len(rules), 1)
v6_ingress = rules[0] v6_ingress = rules[0]
expected = {'direction': 'ingress', expected = {'direction': 'ingress',
'ethertype': 'IPv6', 'ethertype': const.IPv6,
'remote_group_id': security_group_id, 'remote_group_id': security_group_id,
'remote_ip_prefix': None, 'remote_ip_prefix': None,
'protocol': None, 'protocol': None,
@ -607,7 +609,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id'] security_group_id = sg['security_group']['id']
direction = "ingress" direction = "ingress"
remote_ip_prefix = "10.0.0.0/24" remote_ip_prefix = "10.0.0.0/24"
protocol = 'tcp' protocol = const.PROTO_NAME_TCP
port_range_min = 22 port_range_min = 22
port_range_max = 22 port_range_max = 22
keys = [('remote_ip_prefix', remote_ip_prefix), keys = [('remote_ip_prefix', remote_ip_prefix),
@ -631,7 +633,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id'] security_group_id = sg['security_group']['id']
direction = "ingress" direction = "ingress"
remote_group_id = sg2['security_group']['id'] remote_group_id = sg2['security_group']['id']
protocol = 'tcp' protocol = const.PROTO_NAME_TCP
port_range_min = 22 port_range_min = 22
port_range_max = 22 port_range_max = 22
keys = [('remote_group_id', remote_group_id), keys = [('remote_group_id', remote_group_id),
@ -655,7 +657,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id'] security_group_id = sg['security_group']['id']
direction = "ingress" direction = "ingress"
remote_ip_prefix = "10.0.0.0/24" remote_ip_prefix = "10.0.0.0/24"
protocol = 'icmp' protocol = const.PROTO_NAME_ICMP
# port_range_min (ICMP type) is greater than port_range_max # port_range_min (ICMP type) is greater than port_range_max
# (ICMP code) in order to confirm min <= max port check is # (ICMP code) in order to confirm min <= max port check is
# not called for ICMP. # not called for ICMP.
@ -681,7 +683,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id'] security_group_id = sg['security_group']['id']
direction = "ingress" direction = "ingress"
remote_ip_prefix = "10.0.0.0/24" remote_ip_prefix = "10.0.0.0/24"
protocol = 'icmp' protocol = const.PROTO_NAME_ICMP
# ICMP type # ICMP type
port_range_min = 8 port_range_min = 8
# ICMP code # ICMP code
@ -703,7 +705,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087" security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress" direction = "ingress"
remote_ip_prefix = "10.0.0.0/24" remote_ip_prefix = "10.0.0.0/24"
protocol = 'tcp' protocol = const.PROTO_NAME_TCP
port_range_min = 22 port_range_min = 22
port_range_max = 22 port_range_max = 22
remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087" remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
@ -714,13 +716,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
remote_group_id) remote_group_id)
res = self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_bad_security_group_id(self): def test_create_security_group_rule_bad_security_group_id(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087" security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress" direction = "ingress"
remote_ip_prefix = "10.0.0.0/24" remote_ip_prefix = "10.0.0.0/24"
protocol = 'tcp' protocol = const.PROTO_NAME_TCP
port_range_min = 22 port_range_min = 22
port_range_max = 22 port_range_max = 22
rule = self._build_security_group_rule(security_group_id, direction, rule = self._build_security_group_rule(security_group_id, direction,
@ -729,21 +731,21 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
remote_ip_prefix) remote_ip_prefix)
res = self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 404) self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_create_security_group_rule_bad_tenant(self): def test_create_security_group_rule_bad_tenant(self):
with self.security_group() as sg: with self.security_group() as sg:
rule = {'security_group_rule': rule = {'security_group_rule':
{'security_group_id': sg['security_group']['id'], {'security_group_id': sg['security_group']['id'],
'direction': 'ingress', 'direction': 'ingress',
'protocol': 'tcp', 'protocol': const.PROTO_NAME_TCP,
'port_range_min': '22', 'port_range_min': '22',
'port_range_max': '22', 'port_range_max': '22',
'tenant_id': "bad_tenant"}} 'tenant_id': "bad_tenant"}}
res = self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 404) self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_create_security_group_rule_bad_tenant_remote_group_id(self): def test_create_security_group_rule_bad_tenant_remote_group_id(self):
with self.security_group() as sg: with self.security_group() as sg:
@ -754,7 +756,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
rule = {'security_group_rule': rule = {'security_group_rule':
{'security_group_id': sg2['security_group']['id'], {'security_group_id': sg2['security_group']['id'],
'direction': 'ingress', 'direction': 'ingress',
'protocol': 'tcp', 'protocol': const.PROTO_NAME_TCP,
'port_range_min': '22', 'port_range_min': '22',
'port_range_max': '22', 'port_range_max': '22',
'tenant_id': 'bad_tenant', 'tenant_id': 'bad_tenant',
@ -764,7 +766,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
tenant_id='bad_tenant', tenant_id='bad_tenant',
set_context=True) set_context=True)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 404) self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_create_security_group_rule_bad_tenant_security_group_rule(self): def test_create_security_group_rule_bad_tenant_security_group_rule(self):
with self.security_group() as sg: with self.security_group() as sg:
@ -775,7 +777,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
rule = {'security_group_rule': rule = {'security_group_rule':
{'security_group_id': sg['security_group']['id'], {'security_group_id': sg['security_group']['id'],
'direction': 'ingress', 'direction': 'ingress',
'protocol': 'tcp', 'protocol': const.PROTO_NAME_TCP,
'port_range_min': '22', 'port_range_min': '22',
'port_range_max': '22', 'port_range_max': '22',
'tenant_id': 'bad_tenant'}} 'tenant_id': 'bad_tenant'}}
@ -784,7 +786,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
tenant_id='bad_tenant', tenant_id='bad_tenant',
set_context=True) set_context=True)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 404) self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_create_security_group_rule_bad_remote_group_id(self): def test_create_security_group_rule_bad_remote_group_id(self):
name = 'webservers' name = 'webservers'
@ -793,7 +795,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id'] security_group_id = sg['security_group']['id']
remote_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087" remote_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress" direction = "ingress"
protocol = 'tcp' protocol = const.PROTO_NAME_TCP
port_range_min = 22 port_range_min = 22
port_range_max = 22 port_range_max = 22
rule = self._build_security_group_rule(security_group_id, direction, rule = self._build_security_group_rule(security_group_id, direction,
@ -802,7 +804,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
remote_group_id=remote_group_id) remote_group_id=remote_group_id)
res = self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 404) self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_create_security_group_rule_duplicate_rules(self): def test_create_security_group_rule_duplicate_rules(self):
name = 'webservers' name = 'webservers'
@ -811,11 +813,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id'] security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id): with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule( rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', '22') sg['security_group']['id'], 'ingress',
const.PROTO_NAME_TCP, '22', '22')
self._create_security_group_rule(self.fmt, rule) self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409) self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_create_security_group_rule_min_port_greater_max(self): def test_create_security_group_rule_min_port_greater_max(self):
name = 'webservers' name = 'webservers'
@ -823,14 +826,16 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
with self.security_group(name, description) as sg: with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id'] security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id): with self.security_group_rule(security_group_id):
for protocol in ['tcp', 'udp', 6, 17]: for protocol in [const.PROTO_NAME_TCP, const.PROTO_NAME_UDP,
const.PROTO_NUM_TCP, const.PROTO_NUM_UDP]:
rule = self._build_security_group_rule( rule = self._build_security_group_rule(
sg['security_group']['id'], sg['security_group']['id'],
'ingress', protocol, '50', '22') 'ingress', protocol, '50', '22')
self._create_security_group_rule(self.fmt, rule) self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400) self.assertEqual(res.status_int,
webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_ports_but_no_protocol(self): def test_create_security_group_rule_ports_but_no_protocol(self):
name = 'webservers' name = 'webservers'
@ -843,7 +848,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self._create_security_group_rule(self.fmt, rule) self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_port_range_min_only(self): def test_create_security_group_rule_port_range_min_only(self):
name = 'webservers' name = 'webservers'
@ -852,11 +857,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id'] security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id): with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule( rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', None) sg['security_group']['id'], 'ingress',
const.PROTO_NAME_TCP, '22', None)
self._create_security_group_rule(self.fmt, rule) self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_port_range_max_only(self): def test_create_security_group_rule_port_range_max_only(self):
name = 'webservers' name = 'webservers'
@ -865,11 +871,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id'] security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id): with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule( rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', None, '22') sg['security_group']['id'], 'ingress',
const.PROTO_NAME_TCP, None, '22')
self._create_security_group_rule(self.fmt, rule) self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_icmp_type_too_big(self): def test_create_security_group_rule_icmp_type_too_big(self):
name = 'webservers' name = 'webservers'
@ -878,11 +885,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id'] security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id): with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule( rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'icmp', '256', None) sg['security_group']['id'], 'ingress',
const.PROTO_NAME_ICMP, '256', None)
self._create_security_group_rule(self.fmt, rule) self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_icmp_code_too_big(self): def test_create_security_group_rule_icmp_code_too_big(self):
name = 'webservers' name = 'webservers'
@ -891,11 +899,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_group_id = sg['security_group']['id'] security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id): with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule( rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'icmp', '8', '256') sg['security_group']['id'], 'ingress',
const.PROTO_NAME_ICMP, '8', '256')
self._create_security_group_rule(self.fmt, rule) self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_list_ports_security_group(self): def test_list_ports_security_group(self):
with self.network() as n: with self.network() as n:
@ -1107,7 +1116,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_groups=['bad_id']) security_groups=['bad_id'])
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_delete_security_group_port_in_use(self): def test_create_delete_security_group_port_in_use(self):
with self.network() as n: with self.network() as n:
@ -1121,7 +1130,8 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
sg['security_group']['id']) sg['security_group']['id'])
# try to delete security group that's in use # try to delete security group that's in use
res = self._delete('security-groups', res = self._delete('security-groups',
sg['security_group']['id'], 409) sg['security_group']['id'],
webob.exc.HTTPConflict.code)
# delete the blocking port # delete the blocking port
self._delete('ports', port['port']['id']) self._delete('ports', port['port']['id'])
@ -1131,16 +1141,18 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
"security_group_rule create") "security_group_rule create")
with self.security_group() as sg: with self.security_group() as sg:
rule1 = self._build_security_group_rule(sg['security_group']['id'], rule1 = self._build_security_group_rule(sg['security_group']['id'],
'ingress', 'tcp', '22', 'ingress',
const.PROTO_NAME_TCP, '22',
'22', '10.0.0.1/24') '22', '10.0.0.1/24')
rule2 = self._build_security_group_rule(sg['security_group']['id'], rule2 = self._build_security_group_rule(sg['security_group']['id'],
'ingress', 'tcp', '23', 'ingress',
const.PROTO_NAME_TCP, '23',
'23', '10.0.0.1/24') '23', '10.0.0.1/24')
rules = {'security_group_rules': [rule1['security_group_rule'], rules = {'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]} rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules) res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201) self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
def test_create_security_group_rule_bulk_emulated(self): def test_create_security_group_rule_bulk_emulated(self):
real_has_attr = hasattr real_has_attr = hasattr
@ -1155,17 +1167,17 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
new=fakehasattr): new=fakehasattr):
with self.security_group() as sg: with self.security_group() as sg:
rule1 = self._build_security_group_rule( rule1 = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', '22', sg['security_group']['id'], 'ingress',
'10.0.0.1/24') const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
rule2 = self._build_security_group_rule( rule2 = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '23', '23', sg['security_group']['id'], 'ingress',
'10.0.0.1/24') const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24')
rules = {'security_group_rules': [rule1['security_group_rule'], rules = {'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']] rule2['security_group_rule']]
} }
res = self._create_security_group_rule(self.fmt, rules) res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201) self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
def test_create_security_group_rule_duplicate_rule_in_post(self): def test_create_security_group_rule_duplicate_rule_in_post(self):
if self._skip_native_bulk: if self._skip_native_bulk:
@ -1173,13 +1185,14 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
"security_group_rule create") "security_group_rule create")
with self.security_group() as sg: with self.security_group() as sg:
rule = self._build_security_group_rule(sg['security_group']['id'], rule = self._build_security_group_rule(sg['security_group']['id'],
'ingress', 'tcp', '22', 'ingress',
const.PROTO_NAME_TCP, '22',
'22', '10.0.0.1/24') '22', '10.0.0.1/24')
rules = {'security_group_rules': [rule['security_group_rule'], rules = {'security_group_rules': [rule['security_group_rule'],
rule['security_group_rule']]} rule['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules) res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res) rule = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409) self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_create_security_group_rule_duplicate_rule_in_post_emulated(self): def test_create_security_group_rule_duplicate_rule_in_post_emulated(self):
real_has_attr = hasattr real_has_attr = hasattr
@ -1195,13 +1208,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
with self.security_group() as sg: with self.security_group() as sg:
rule = self._build_security_group_rule( rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', '22', sg['security_group']['id'], 'ingress',
'10.0.0.1/24') const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
rules = {'security_group_rules': [rule['security_group_rule'], rules = {'security_group_rules': [rule['security_group_rule'],
rule['security_group_rule']]} rule['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules) res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res) rule = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409) self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_create_security_group_rule_duplicate_rule_db(self): def test_create_security_group_rule_duplicate_rule_db(self):
if self._skip_native_bulk: if self._skip_native_bulk:
@ -1209,13 +1222,14 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
"security_group_rule create") "security_group_rule create")
with self.security_group() as sg: with self.security_group() as sg:
rule = self._build_security_group_rule(sg['security_group']['id'], rule = self._build_security_group_rule(sg['security_group']['id'],
'ingress', 'tcp', '22', 'ingress',
const.PROTO_NAME_TCP, '22',
'22', '10.0.0.1/24') '22', '10.0.0.1/24')
rules = {'security_group_rules': [rule]} rules = {'security_group_rules': [rule]}
self._create_security_group_rule(self.fmt, rules) self._create_security_group_rule(self.fmt, rules)
res = self._create_security_group_rule(self.fmt, rules) res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res) rule = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409) self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_create_security_group_rule_duplicate_rule_db_emulated(self): def test_create_security_group_rule_duplicate_rule_db_emulated(self):
real_has_attr = hasattr real_has_attr = hasattr
@ -1230,13 +1244,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
new=fakehasattr): new=fakehasattr):
with self.security_group() as sg: with self.security_group() as sg:
rule = self._build_security_group_rule( rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress', 'tcp', '22', '22', sg['security_group']['id'], 'ingress',
'10.0.0.1/24') const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
rules = {'security_group_rules': [rule]} rules = {'security_group_rules': [rule]}
self._create_security_group_rule(self.fmt, rules) self._create_security_group_rule(self.fmt, rules)
res = self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409) self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_create_security_group_rule_differnt_security_group_ids(self): def test_create_security_group_rule_differnt_security_group_ids(self):
if self._skip_native_bulk: if self._skip_native_bulk:
@ -1245,24 +1259,24 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
with self.security_group() as sg1: with self.security_group() as sg1:
with self.security_group() as sg2: with self.security_group() as sg2:
rule1 = self._build_security_group_rule( rule1 = self._build_security_group_rule(
sg1['security_group']['id'], 'ingress', 'tcp', '22', '22', sg1['security_group']['id'], 'ingress',
'10.0.0.1/24') const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
rule2 = self._build_security_group_rule( rule2 = self._build_security_group_rule(
sg2['security_group']['id'], 'ingress', 'tcp', '23', '23', sg2['security_group']['id'], 'ingress',
'10.0.0.1/24') const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24')
rules = {'security_group_rules': [rule1['security_group_rule'], rules = {'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']] rule2['security_group_rule']]
} }
res = self._create_security_group_rule(self.fmt, rules) res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_with_invalid_ethertype(self): def test_create_security_group_rule_with_invalid_ethertype(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087" security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress" direction = "ingress"
remote_ip_prefix = "10.0.0.0/24" remote_ip_prefix = "10.0.0.0/24"
protocol = 'tcp' protocol = const.PROTO_NAME_TCP
port_range_min = 22 port_range_min = 22
port_range_max = 22 port_range_max = 22
remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087" remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
@ -1274,7 +1288,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
ethertype='IPv5') ethertype='IPv5')
res = self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_with_invalid_protocol(self): def test_create_security_group_rule_with_invalid_protocol(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087" security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
@ -1291,7 +1305,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
remote_group_id) remote_group_id)
res = self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_port_with_non_uuid(self): def test_create_port_with_non_uuid(self):
with self.network() as n: with self.network() as n:
@ -1300,7 +1314,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
security_groups=['not_valid']) security_groups=['not_valid'])
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
class TestSecurityGroupsXML(TestSecurityGroups): class TestSecurityGroupsXML(TestSecurityGroups):

View File

@ -21,11 +21,13 @@ import mock
from mock import call from mock import call
import mox import mox
from oslo.config import cfg from oslo.config import cfg
import webob.exc
from neutron.agent import firewall as firewall_base from neutron.agent import firewall as firewall_base
from neutron.agent.linux import iptables_manager from neutron.agent.linux import iptables_manager
from neutron.agent import rpc as agent_rpc from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import constants as const
from neutron import context from neutron import context
from neutron.db import securitygroups_rpc_base as sg_db_rpc from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import allowedaddresspairs as addr_pair from neutron.extensions import allowedaddresspairs as addr_pair
@ -54,7 +56,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
self.rpc = FakeSGCallback() self.rpc = FakeSGCallback()
def test_security_group_rules_for_devices_ipv4_ingress(self): def test_security_group_rules_for_devices_ipv4_ingress(self):
fake_prefix = test_fw.FAKE_PREFIX['IPv4'] fake_prefix = test_fw.FAKE_PREFIX[const.IPv4]
with self.network() as n: with self.network() as n:
with nested(self.subnet(n), with nested(self.subnet(n),
self.security_group()) as (subnet_v4, self.security_group()) as (subnet_v4,
@ -62,18 +64,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
sg1_id = sg1['security_group']['id'] sg1_id = sg1['security_group']['id']
rule1 = self._build_security_group_rule( rule1 = self._build_security_group_rule(
sg1_id, sg1_id,
'ingress', 'tcp', '22', 'ingress', const.PROTO_NAME_TCP, '22',
'22') '22')
rule2 = self._build_security_group_rule( rule2 = self._build_security_group_rule(
sg1_id, sg1_id,
'ingress', 'tcp', '23', 'ingress', const.PROTO_NAME_TCP, '23',
'23', fake_prefix) '23', fake_prefix)
rules = { rules = {
'security_group_rules': [rule1['security_group_rule'], 'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]} rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules) res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201) self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port( res1 = self._create_port(
self.fmt, n['network']['id'], self.fmt, n['network']['id'],
@ -86,17 +88,19 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
ports_rpc = self.rpc.security_group_rules_for_devices( ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices) ctx, devices=devices)
port_rpc = ports_rpc[port_id1] port_rpc = ports_rpc[port_id1]
expected = [{'direction': 'egress', 'ethertype': 'IPv4', expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id}, 'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': 'IPv6', {'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id}, 'security_group_id': sg1_id},
{'direction': 'ingress', {'direction': 'ingress',
'protocol': 'tcp', 'ethertype': 'IPv4', 'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv4,
'port_range_max': 22, 'port_range_max': 22,
'security_group_id': sg1_id, 'security_group_id': sg1_id,
'port_range_min': 22}, 'port_range_min': 22},
{'direction': 'ingress', 'protocol': 'tcp', {'direction': 'ingress',
'ethertype': 'IPv4', 'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv4,
'port_range_max': 23, 'security_group_id': sg1_id, 'port_range_max': 23, 'security_group_id': sg1_id,
'port_range_min': 23, 'port_range_min': 23,
'source_ip_prefix': fake_prefix}, 'source_ip_prefix': fake_prefix},
@ -169,7 +173,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
self._delete('ports', port_id1) self._delete('ports', port_id1)
def test_security_group_rules_for_devices_ipv4_egress(self): def test_security_group_rules_for_devices_ipv4_egress(self):
fake_prefix = test_fw.FAKE_PREFIX['IPv4'] fake_prefix = test_fw.FAKE_PREFIX[const.IPv4]
with self.network() as n: with self.network() as n:
with nested(self.subnet(n), with nested(self.subnet(n),
self.security_group()) as (subnet_v4, self.security_group()) as (subnet_v4,
@ -177,18 +181,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
sg1_id = sg1['security_group']['id'] sg1_id = sg1['security_group']['id']
rule1 = self._build_security_group_rule( rule1 = self._build_security_group_rule(
sg1_id, sg1_id,
'egress', 'tcp', '22', 'egress', const.PROTO_NAME_TCP, '22',
'22') '22')
rule2 = self._build_security_group_rule( rule2 = self._build_security_group_rule(
sg1_id, sg1_id,
'egress', 'udp', '23', 'egress', const.PROTO_NAME_UDP, '23',
'23', fake_prefix) '23', fake_prefix)
rules = { rules = {
'security_group_rules': [rule1['security_group_rule'], 'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]} rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules) res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201) self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port( res1 = self._create_port(
self.fmt, n['network']['id'], self.fmt, n['network']['id'],
@ -201,17 +205,19 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
ports_rpc = self.rpc.security_group_rules_for_devices( ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices) ctx, devices=devices)
port_rpc = ports_rpc[port_id1] port_rpc = ports_rpc[port_id1]
expected = [{'direction': 'egress', 'ethertype': 'IPv4', expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id}, 'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': 'IPv6', {'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id}, 'security_group_id': sg1_id},
{'direction': 'egress', {'direction': 'egress',
'protocol': 'tcp', 'ethertype': 'IPv4', 'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv4,
'port_range_max': 22, 'port_range_max': 22,
'security_group_id': sg1_id, 'security_group_id': sg1_id,
'port_range_min': 22}, 'port_range_min': 22},
{'direction': 'egress', 'protocol': 'udp', {'direction': 'egress',
'ethertype': 'IPv4', 'protocol': const.PROTO_NAME_UDP,
'ethertype': const.IPv4,
'port_range_max': 23, 'security_group_id': sg1_id, 'port_range_max': 23, 'security_group_id': sg1_id,
'port_range_min': 23, 'port_range_min': 23,
'dest_ip_prefix': fake_prefix}, 'dest_ip_prefix': fake_prefix},
@ -232,13 +238,13 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
sg2_id = sg2['security_group']['id'] sg2_id = sg2['security_group']['id']
rule1 = self._build_security_group_rule( rule1 = self._build_security_group_rule(
sg1_id, sg1_id,
'ingress', 'tcp', '24', 'ingress', const.PROTO_NAME_TCP, '24',
'25', remote_group_id=sg2['security_group']['id']) '25', remote_group_id=sg2['security_group']['id'])
rules = { rules = {
'security_group_rules': [rule1['security_group_rule']]} 'security_group_rules': [rule1['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules) res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201) self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port( res1 = self._create_port(
self.fmt, n['network']['id'], self.fmt, n['network']['id'],
@ -258,17 +264,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
ports_rpc = self.rpc.security_group_rules_for_devices( ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices) ctx, devices=devices)
port_rpc = ports_rpc[port_id1] port_rpc = ports_rpc[port_id1]
expected = [{'direction': 'egress', 'ethertype': 'IPv4', expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id}, 'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': 'IPv6', {'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id}, 'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': 'IPv4', {'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg2_id}, 'security_group_id': sg2_id},
{'direction': 'egress', 'ethertype': 'IPv6', {'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg2_id}, 'security_group_id': sg2_id},
{'direction': u'ingress', {'direction': u'ingress',
'source_ip_prefix': u'10.0.0.3/32', 'source_ip_prefix': u'10.0.0.3/32',
'protocol': u'tcp', 'ethertype': u'IPv4', 'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv4,
'port_range_max': 25, 'port_range_min': 24, 'port_range_max': 25, 'port_range_min': 24,
'remote_group_id': sg2_id, 'remote_group_id': sg2_id,
'security_group_id': sg1_id}, 'security_group_id': sg1_id},
@ -279,7 +286,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
self._delete('ports', port_id2) self._delete('ports', port_id2)
def test_security_group_rules_for_devices_ipv6_ingress(self): def test_security_group_rules_for_devices_ipv6_ingress(self):
fake_prefix = test_fw.FAKE_PREFIX['IPv6'] fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
with self.network() as n: with self.network() as n:
with nested(self.subnet(n, with nested(self.subnet(n,
cidr=fake_prefix, cidr=fake_prefix,
@ -289,20 +296,20 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
sg1_id = sg1['security_group']['id'] sg1_id = sg1['security_group']['id']
rule1 = self._build_security_group_rule( rule1 = self._build_security_group_rule(
sg1_id, sg1_id,
'ingress', 'tcp', '22', 'ingress', const.PROTO_NAME_TCP, '22',
'22', '22',
ethertype='IPv6') ethertype=const.IPv6)
rule2 = self._build_security_group_rule( rule2 = self._build_security_group_rule(
sg1_id, sg1_id,
'ingress', 'udp', '23', 'ingress', const.PROTO_NAME_UDP, '23',
'23', fake_prefix, '23', fake_prefix,
ethertype='IPv6') ethertype=const.IPv6)
rules = { rules = {
'security_group_rules': [rule1['security_group_rule'], 'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]} rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules) res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201) self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port( res1 = self._create_port(
self.fmt, n['network']['id'], self.fmt, n['network']['id'],
@ -316,17 +323,19 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
ports_rpc = self.rpc.security_group_rules_for_devices( ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices) ctx, devices=devices)
port_rpc = ports_rpc[port_id1] port_rpc = ports_rpc[port_id1]
expected = [{'direction': 'egress', 'ethertype': 'IPv4', expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id}, 'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': 'IPv6', {'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id}, 'security_group_id': sg1_id},
{'direction': 'ingress', {'direction': 'ingress',
'protocol': 'tcp', 'ethertype': 'IPv6', 'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv6,
'port_range_max': 22, 'port_range_max': 22,
'security_group_id': sg1_id, 'security_group_id': sg1_id,
'port_range_min': 22}, 'port_range_min': 22},
{'direction': 'ingress', 'protocol': 'udp', {'direction': 'ingress',
'ethertype': 'IPv6', 'protocol': const.PROTO_NAME_UDP,
'ethertype': const.IPv6,
'port_range_max': 23, 'security_group_id': sg1_id, 'port_range_max': 23, 'security_group_id': sg1_id,
'port_range_min': 23, 'port_range_min': 23,
'source_ip_prefix': fake_prefix}, 'source_ip_prefix': fake_prefix},
@ -336,7 +345,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
self._delete('ports', port_id1) self._delete('ports', port_id1)
def test_security_group_rules_for_devices_ipv6_egress(self): def test_security_group_rules_for_devices_ipv6_egress(self):
fake_prefix = test_fw.FAKE_PREFIX['IPv6'] fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
with self.network() as n: with self.network() as n:
with nested(self.subnet(n, with nested(self.subnet(n,
cidr=fake_prefix, cidr=fake_prefix,
@ -346,20 +355,20 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
sg1_id = sg1['security_group']['id'] sg1_id = sg1['security_group']['id']
rule1 = self._build_security_group_rule( rule1 = self._build_security_group_rule(
sg1_id, sg1_id,
'egress', 'tcp', '22', 'egress', const.PROTO_NAME_TCP, '22',
'22', '22',
ethertype='IPv6') ethertype=const.IPv6)
rule2 = self._build_security_group_rule( rule2 = self._build_security_group_rule(
sg1_id, sg1_id,
'egress', 'udp', '23', 'egress', const.PROTO_NAME_UDP, '23',
'23', fake_prefix, '23', fake_prefix,
ethertype='IPv6') ethertype=const.IPv6)
rules = { rules = {
'security_group_rules': [rule1['security_group_rule'], 'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]} rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules) res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201) self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port( res1 = self._create_port(
self.fmt, n['network']['id'], self.fmt, n['network']['id'],
@ -374,18 +383,21 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
ports_rpc = self.rpc.security_group_rules_for_devices( ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices) ctx, devices=devices)
port_rpc = ports_rpc[port_id1] port_rpc = ports_rpc[port_id1]
expected = [{'direction': 'egress', 'ethertype': 'IPv4', expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id}, 'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': 'IPv6', {'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id}, 'security_group_id': sg1_id},
{'direction': 'egress', {'direction': 'egress',
'protocol': 'tcp', 'ethertype': 'IPv6', 'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv6,
'port_range_max': 22, 'port_range_max': 22,
'security_group_id': sg1_id, 'security_group_id': sg1_id,
'port_range_min': 22}, 'port_range_min': 22},
{'direction': 'egress', 'protocol': 'udp', {'direction': 'egress',
'ethertype': 'IPv6', 'protocol': const.PROTO_NAME_UDP,
'port_range_max': 23, 'security_group_id': sg1_id, 'ethertype': const.IPv6,
'port_range_max': 23,
'security_group_id': sg1_id,
'port_range_min': 23, 'port_range_min': 23,
'dest_ip_prefix': fake_prefix}, 'dest_ip_prefix': fake_prefix},
] ]
@ -394,7 +406,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
self._delete('ports', port_id1) self._delete('ports', port_id1)
def test_security_group_rules_for_devices_ipv6_source_group(self): def test_security_group_rules_for_devices_ipv6_source_group(self):
fake_prefix = test_fw.FAKE_PREFIX['IPv6'] fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
with self.network() as n: with self.network() as n:
with nested(self.subnet(n, with nested(self.subnet(n,
cidr=fake_prefix, cidr=fake_prefix,
@ -407,15 +419,15 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
sg2_id = sg2['security_group']['id'] sg2_id = sg2['security_group']['id']
rule1 = self._build_security_group_rule( rule1 = self._build_security_group_rule(
sg1_id, sg1_id,
'ingress', 'tcp', '24', 'ingress', const.PROTO_NAME_TCP, '24',
'25', '25',
ethertype='IPv6', ethertype=const.IPv6,
remote_group_id=sg2['security_group']['id']) remote_group_id=sg2['security_group']['id'])
rules = { rules = {
'security_group_rules': [rule1['security_group_rule']]} 'security_group_rules': [rule1['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules) res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 201) self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port( res1 = self._create_port(
self.fmt, n['network']['id'], self.fmt, n['network']['id'],
@ -438,17 +450,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
ports_rpc = self.rpc.security_group_rules_for_devices( ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices) ctx, devices=devices)
port_rpc = ports_rpc[port_id1] port_rpc = ports_rpc[port_id1]
expected = [{'direction': 'egress', 'ethertype': 'IPv4', expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id}, 'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': 'IPv6', {'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id}, 'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': 'IPv4', {'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg2_id}, 'security_group_id': sg2_id},
{'direction': 'egress', 'ethertype': 'IPv6', {'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg2_id}, 'security_group_id': sg2_id},
{'direction': 'ingress', {'direction': 'ingress',
'source_ip_prefix': 'fe80::3/128', 'source_ip_prefix': 'fe80::3/128',
'protocol': 'tcp', 'ethertype': 'IPv6', 'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv6,
'port_range_max': 25, 'port_range_min': 24, 'port_range_max': 25, 'port_range_min': 24,
'remote_group_id': sg2_id, 'remote_group_id': sg2_id,
'security_group_id': sg1_id}, 'security_group_id': sg1_id},
@ -1220,36 +1233,36 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
self.rpc = mock.Mock() self.rpc = mock.Mock()
self.agent.plugin_rpc = self.rpc self.agent.plugin_rpc = self.rpc
rule1 = [{'direction': 'ingress', rule1 = [{'direction': 'ingress',
'protocol': 'udp', 'protocol': const.PROTO_NAME_UDP,
'ethertype': 'IPv4', 'ethertype': const.IPv4,
'source_ip_prefix': '10.0.0.2', 'source_ip_prefix': '10.0.0.2',
'source_port_range_min': 67, 'source_port_range_min': 67,
'source_port_range_max': 67, 'source_port_range_max': 67,
'port_range_min': 68, 'port_range_min': 68,
'port_range_max': 68}, 'port_range_max': 68},
{'direction': 'ingress', {'direction': 'ingress',
'protocol': 'tcp', 'protocol': const.PROTO_NAME_TCP,
'ethertype': 'IPv4', 'ethertype': const.IPv4,
'port_range_min': 22, 'port_range_min': 22,
'port_range_max': 22}, 'port_range_max': 22},
{'direction': 'egress', {'direction': 'egress',
'ethertype': 'IPv4'}] 'ethertype': const.IPv4}]
rule2 = rule1[:] rule2 = rule1[:]
rule2 += [{'direction': 'ingress', rule2 += [{'direction': 'ingress',
'source_ip_prefix': '10.0.0.4', 'source_ip_prefix': '10.0.0.4',
'ethertype': 'IPv4'}] 'ethertype': const.IPv4}]
rule3 = rule2[:] rule3 = rule2[:]
rule3 += [{'direction': 'ingress', rule3 += [{'direction': 'ingress',
'protocol': 'icmp', 'protocol': const.PROTO_NAME_ICMP,
'ethertype': 'IPv4'}] 'ethertype': const.IPv4}]
rule4 = rule1[:] rule4 = rule1[:]
rule4 += [{'direction': 'ingress', rule4 += [{'direction': 'ingress',
'source_ip_prefix': '10.0.0.3', 'source_ip_prefix': '10.0.0.3',
'ethertype': 'IPv4'}] 'ethertype': const.IPv4}]
rule5 = rule4[:] rule5 = rule4[:]
rule5 += [{'direction': 'ingress', rule5 += [{'direction': 'ingress',
'protocol': 'icmp', 'protocol': const.PROTO_NAME_ICMP,
'ethertype': 'IPv4'}] 'ethertype': const.IPv4}]
self.devices1 = {'tap_port1': self._device('tap_port1', self.devices1 = {'tap_port1': self._device('tap_port1',
'10.0.0.3', '10.0.0.3',
'12:34:56:78:9a:bc', '12:34:56:78:9a:bc',
@ -1360,7 +1373,7 @@ class SGNotificationTestMixin():
security_group_id = sg['security_group']['id'] security_group_id = sg['security_group']['id']
direction = "ingress" direction = "ingress"
remote_group_id = sg2['security_group']['id'] remote_group_id = sg2['security_group']['id']
protocol = 'tcp' protocol = const.PROTO_NAME_TCP
port_range_min = 88 port_range_min = 88
port_range_max = 88 port_range_max = 88
with self.security_group_rule(security_group_id, direction, with self.security_group_rule(security_group_id, direction,