Change hard coded numbers to constants in security group tests
Security groups tests in Neutron contain a lot of hard coded HTTP return codes and protocol numbers. These should be changed to use constants. Change-Id: Ibecff3821c54f12848a05648f35381a0c73a0896 Fixes: Bug 1218928
This commit is contained in:
parent
be8334ec65
commit
2e375d0670
@ -35,11 +35,6 @@ METERING_LABEL_KEY = '_metering_labels'
|
||||
IPv4 = 'IPv4'
|
||||
IPv6 = 'IPv6'
|
||||
|
||||
ICMP_PROTOCOL = 1
|
||||
TCP_PROTOCOL = 6
|
||||
UDP_PROTOCOL = 17
|
||||
ICMPv6_PROTOCOL = 58
|
||||
|
||||
DHCP_RESPONSE_PORT = 68
|
||||
|
||||
MIN_VLAN_TAG = 1
|
||||
@ -87,3 +82,13 @@ PORT_BINDING_EXT_ALIAS = 'binding'
|
||||
L3_AGENT_SCHEDULER_EXT_ALIAS = 'l3_agent_scheduler'
|
||||
DHCP_AGENT_SCHEDULER_EXT_ALIAS = 'dhcp_agent_scheduler'
|
||||
LBAAS_AGENT_SCHEDULER_EXT_ALIAS = 'lbaas_agent_scheduler'
|
||||
|
||||
# Protocol names and numbers for Security Groups/Firewalls
|
||||
PROTO_NAME_TCP = 'tcp'
|
||||
PROTO_NAME_ICMP = 'icmp'
|
||||
PROTO_NAME_ICMP_V6 = 'icmpv6'
|
||||
PROTO_NAME_UDP = 'udp'
|
||||
PROTO_NUM_TCP = 6
|
||||
PROTO_NUM_ICMP = 1
|
||||
PROTO_NUM_ICMP_V6 = 58
|
||||
PROTO_NUM_UDP = 17
|
||||
|
@ -30,10 +30,10 @@ from neutron.extensions import securitygroup as ext_sg
|
||||
from neutron.openstack.common import uuidutils
|
||||
|
||||
|
||||
IP_PROTOCOL_MAP = {'tcp': constants.TCP_PROTOCOL,
|
||||
'udp': constants.UDP_PROTOCOL,
|
||||
'icmp': constants.ICMP_PROTOCOL,
|
||||
'icmpv6': constants.ICMPv6_PROTOCOL}
|
||||
IP_PROTOCOL_MAP = {constants.PROTO_NAME_TCP: constants.PROTO_NUM_TCP,
|
||||
constants.PROTO_NAME_UDP: constants.PROTO_NUM_UDP,
|
||||
constants.PROTO_NAME_ICMP: constants.PROTO_NUM_ICMP,
|
||||
constants.PROTO_NAME_ICMP_V6: constants.PROTO_NUM_ICMP_V6}
|
||||
|
||||
|
||||
class SecurityGroup(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
|
||||
@ -304,13 +304,13 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
|
||||
if not rule['protocol']:
|
||||
raise ext_sg.SecurityGroupProtocolRequiredWithPorts()
|
||||
ip_proto = self._get_ip_proto_number(rule['protocol'])
|
||||
if ip_proto in [constants.TCP_PROTOCOL, constants.UDP_PROTOCOL]:
|
||||
if ip_proto in [constants.PROTO_NUM_TCP, constants.PROTO_NUM_UDP]:
|
||||
if (rule['port_range_min'] is not None and
|
||||
rule['port_range_min'] <= rule['port_range_max']):
|
||||
pass
|
||||
else:
|
||||
raise ext_sg.SecurityGroupInvalidPortRange()
|
||||
elif ip_proto == constants.ICMP_PROTOCOL:
|
||||
elif ip_proto == constants.PROTO_NUM_ICMP:
|
||||
for attr, field in [('port_range_min', 'type'),
|
||||
('port_range_max', 'code')]:
|
||||
if rule[attr] > 255:
|
||||
|
@ -23,6 +23,7 @@ from oslo.config import cfg
|
||||
from neutron.api import extensions
|
||||
from neutron.api.v2 import attributes as attr
|
||||
from neutron.api.v2 import base
|
||||
from neutron.common import constants as const
|
||||
from neutron.common import exceptions as qexception
|
||||
from neutron import manager
|
||||
from neutron.openstack.common import uuidutils
|
||||
@ -158,7 +159,8 @@ def _validate_name_not_default(data, valid_values=None):
|
||||
|
||||
attr.validators['type:name_not_default'] = _validate_name_not_default
|
||||
|
||||
sg_supported_protocols = [None, 'tcp', 'udp', 'icmp']
|
||||
sg_supported_protocols = [None, const.PROTO_NAME_TCP,
|
||||
const.PROTO_NAME_UDP, const.PROTO_NAME_ICMP]
|
||||
sg_supported_ethertypes = ['IPv4', 'IPv6']
|
||||
|
||||
# Attribute Map
|
||||
|
@ -61,8 +61,8 @@ def get_protocol_value(protocol):
|
||||
return protocol
|
||||
|
||||
mapping = {
|
||||
'tcp': constants.TCP_PROTOCOL,
|
||||
'udp': constants.UDP_PROTOCOL,
|
||||
'icmp': constants.ICMP_PROTOCOL
|
||||
constants.PROTO_NAME_TCP: constants.PROTO_NUM_TCP,
|
||||
constants.PROTO_NAME_UDP: constants.PROTO_NUM_UDP,
|
||||
constants.PROTO_NAME_ICMP: constants.PROTO_NUM_ICMP
|
||||
}
|
||||
return mapping.get(protocol.lower())
|
||||
|
@ -1073,7 +1073,7 @@ def create_security_profile(cluster, tenant_id, security_profile):
|
||||
# Allow all dhcp responses and all ingress traffic
|
||||
hidden_rules = {'logical_port_egress_rules':
|
||||
[{'ethertype': 'IPv4',
|
||||
'protocol': constants.UDP_PROTOCOL,
|
||||
'protocol': constants.PROTO_NUM_UDP,
|
||||
'port_range_min': constants.DHCP_RESPONSE_PORT,
|
||||
'port_range_max': constants.DHCP_RESPONSE_PORT,
|
||||
'ip_prefix': '0.0.0.0/0'}],
|
||||
@ -1111,7 +1111,7 @@ def update_security_group_rules(cluster, spid, rules):
|
||||
|
||||
# Allow all dhcp responses in
|
||||
rules['logical_port_egress_rules'].append(
|
||||
{'ethertype': 'IPv4', 'protocol': constants.UDP_PROTOCOL,
|
||||
{'ethertype': 'IPv4', 'protocol': constants.PROTO_NUM_UDP,
|
||||
'port_range_min': constants.DHCP_RESPONSE_PORT,
|
||||
'port_range_max': constants.DHCP_RESPONSE_PORT,
|
||||
'ip_prefix': '0.0.0.0/0'})
|
||||
|
@ -20,6 +20,7 @@ import mock
|
||||
import webob.exc
|
||||
|
||||
from neutron.api.v2 import attributes as attr
|
||||
from neutron.common import constants as const
|
||||
from neutron.common.test_lib import test_config
|
||||
from neutron import context
|
||||
from neutron.db import db_base_plugin_v2
|
||||
@ -75,7 +76,7 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
|
||||
port_range_min=None, port_range_max=None,
|
||||
remote_ip_prefix=None, remote_group_id=None,
|
||||
tenant_id='test_tenant',
|
||||
ethertype='IPv4'):
|
||||
ethertype=const.IPv4):
|
||||
|
||||
data = {'security_group_rule': {'security_group_id': security_group_id,
|
||||
'direction': direction,
|
||||
@ -110,13 +111,13 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
|
||||
|
||||
def _make_security_group(self, fmt, name, description, **kwargs):
|
||||
res = self._create_security_group(fmt, name, description, **kwargs)
|
||||
if res.status_int >= 400:
|
||||
if res.status_int >= webob.exc.HTTPBadRequest.code:
|
||||
raise webob.exc.HTTPClientError(code=res.status_int)
|
||||
return self.deserialize(fmt, res)
|
||||
|
||||
def _make_security_group_rule(self, fmt, rules, **kwargs):
|
||||
res = self._create_security_group_rule(self.fmt, rules)
|
||||
if res.status_int >= 400:
|
||||
if res.status_int >= webob.exc.HTTPBadRequest.code:
|
||||
raise webob.exc.HTTPClientError(code=res.status_int)
|
||||
return self.deserialize(fmt, res)
|
||||
|
||||
@ -136,10 +137,10 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
|
||||
@contextlib.contextmanager
|
||||
def security_group_rule(self, security_group_id='4cd70774-cc67-4a87-9b39-7'
|
||||
'd1db38eb087',
|
||||
direction='ingress', protocol='tcp',
|
||||
direction='ingress', protocol=const.PROTO_NAME_TCP,
|
||||
port_range_min='22', port_range_max='22',
|
||||
remote_ip_prefix=None, remote_group_id=None,
|
||||
fmt=None, no_delete=False, ethertype='IPv4'):
|
||||
fmt=None, no_delete=False, ethertype=const.IPv4):
|
||||
if not fmt:
|
||||
fmt = self.fmt
|
||||
rule = self._build_security_group_rule(security_group_id,
|
||||
@ -263,11 +264,11 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
sg_rules = security_group['security_group']['security_group_rules']
|
||||
self.assertEqual(len(sg_rules), 2)
|
||||
|
||||
v4_rules = filter(lambda x: x['ethertype'] == 'IPv4', sg_rules)
|
||||
v4_rules = [r for r in sg_rules if r['ethertype'] == const.IPv4]
|
||||
self.assertEqual(len(v4_rules), 1)
|
||||
v4_rule = v4_rules[0]
|
||||
expected = {'direction': 'egress',
|
||||
'ethertype': 'IPv4',
|
||||
'ethertype': const.IPv4,
|
||||
'remote_group_id': None,
|
||||
'remote_ip_prefix': None,
|
||||
'protocol': None,
|
||||
@ -275,11 +276,11 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
'port_range_min': None}
|
||||
self._assert_sg_rule_has_kvs(v4_rule, expected)
|
||||
|
||||
v6_rules = filter(lambda x: x['ethertype'] == 'IPv6', sg_rules)
|
||||
v6_rules = [r for r in sg_rules if r['ethertype'] == const.IPv6]
|
||||
self.assertEqual(len(v6_rules), 1)
|
||||
v6_rule = v6_rules[0]
|
||||
expected = {'direction': 'egress',
|
||||
'ethertype': 'IPv6',
|
||||
'ethertype': const.IPv6,
|
||||
'remote_group_id': None,
|
||||
'remote_ip_prefix': None,
|
||||
'protocol': None,
|
||||
@ -309,7 +310,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
sg['security_group']['id'])
|
||||
req.environ['neutron.context'] = context.Context('', 'somebody')
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, 409)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
|
||||
|
||||
def test_update_default_security_group_name_fail(self):
|
||||
with self.network():
|
||||
@ -322,7 +323,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
sg['security_groups'][0]['id'])
|
||||
req.environ['neutron.context'] = context.Context('', 'somebody')
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, 404)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
|
||||
|
||||
def test_update_default_security_group_with_description(self):
|
||||
with self.network():
|
||||
@ -347,7 +348,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
description = 'my webservers'
|
||||
res = self._create_security_group(self.fmt, name, description)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 409)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
|
||||
|
||||
def test_list_security_groups(self):
|
||||
with contextlib.nested(self.security_group(name='sg1',
|
||||
@ -406,23 +407,23 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
security_group_id = sg['security_group']['id']
|
||||
ethertype = 2
|
||||
rule = self._build_security_group_rule(
|
||||
security_group_id, 'ingress', 'tcp', '22', '22', None, None,
|
||||
ethertype=ethertype)
|
||||
security_group_id, 'ingress', const.PROTO_NAME_TCP, '22',
|
||||
'22', None, None, ethertype=ethertype)
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
|
||||
def test_create_security_group_rule_tcp_protocol_as_number(self):
|
||||
name = 'webservers'
|
||||
description = 'my webservers'
|
||||
with self.security_group(name, description) as sg:
|
||||
security_group_id = sg['security_group']['id']
|
||||
protocol = 6 # TCP
|
||||
protocol = const.PROTO_NUM_TCP # TCP
|
||||
rule = self._build_security_group_rule(
|
||||
security_group_id, 'ingress', protocol, '22', '22')
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 201)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
|
||||
|
||||
def test_create_security_group_rule_protocol_as_number(self):
|
||||
name = 'webservers'
|
||||
@ -434,7 +435,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
security_group_id, 'ingress', protocol)
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 201)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
|
||||
|
||||
def test_create_security_group_rule_case_insensitive(self):
|
||||
name = 'webservers'
|
||||
@ -457,7 +458,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
self.assertEqual(rule['security_group_rule']['protocol'],
|
||||
protocol.lower())
|
||||
self.assertEqual(rule['security_group_rule']['ethertype'],
|
||||
'IPv4')
|
||||
const.IPv4)
|
||||
|
||||
def test_get_security_group(self):
|
||||
name = 'webservers'
|
||||
@ -468,7 +469,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
security_group_id = sg['security_group']['id']
|
||||
direction = "ingress"
|
||||
remote_ip_prefix = "10.0.0.0/24"
|
||||
protocol = 'tcp'
|
||||
protocol = const.PROTO_NAME_TCP
|
||||
port_range_min = 22
|
||||
port_range_max = 22
|
||||
keys = [('remote_ip_prefix', remote_ip_prefix),
|
||||
@ -488,8 +489,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
self.assertEqual(group['security_group']['id'],
|
||||
remote_group_id)
|
||||
self.assertEqual(len(sg_rule), 3)
|
||||
sg_rule = filter(lambda x: x['direction'] == 'ingress',
|
||||
sg_rule)
|
||||
sg_rule = [r for r in sg_rule if r['direction'] == 'ingress']
|
||||
for k, v, in keys:
|
||||
self.assertEqual(sg_rule[0][k], v)
|
||||
|
||||
@ -498,14 +498,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
description = 'my webservers'
|
||||
with self.security_group(name, description, no_delete=True) as sg:
|
||||
remote_group_id = sg['security_group']['id']
|
||||
self._delete('security-groups', remote_group_id, 204)
|
||||
self._delete('security-groups', remote_group_id,
|
||||
webob.exc.HTTPNoContent.code)
|
||||
|
||||
def test_delete_default_security_group_admin(self):
|
||||
with self.network():
|
||||
res = self.new_list_request('security-groups')
|
||||
sg = self.deserialize(self.fmt, res.get_response(self.ext_api))
|
||||
self._delete('security-groups', sg['security_groups'][0]['id'],
|
||||
204)
|
||||
webob.exc.HTTPNoContent.code)
|
||||
|
||||
def test_delete_default_security_group_nonadmin(self):
|
||||
with self.network():
|
||||
@ -513,7 +514,8 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
sg = self.deserialize(self.fmt, res.get_response(self.ext_api))
|
||||
neutron_context = context.Context('', 'test-tenant')
|
||||
self._delete('security-groups', sg['security_groups'][0]['id'],
|
||||
409, neutron_context=neutron_context)
|
||||
webob.exc.HTTPConflict.code,
|
||||
neutron_context=neutron_context)
|
||||
|
||||
def test_security_group_list_creates_default_security_group(self):
|
||||
neutron_context = context.Context('', 'test-tenant')
|
||||
@ -533,15 +535,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
|
||||
# Verify default rule for v4 egress
|
||||
sg_rules = rules['security_group_rules']
|
||||
rules = filter(
|
||||
lambda x: (
|
||||
x['direction'] == 'egress' and x['ethertype'] == 'IPv4'),
|
||||
sg_rules)
|
||||
rules = [
|
||||
r for r in sg_rules
|
||||
if r['direction'] == 'egress' and r['ethertype'] == const.IPv4
|
||||
]
|
||||
self.assertEqual(len(rules), 1)
|
||||
v4_egress = rules[0]
|
||||
|
||||
expected = {'direction': 'egress',
|
||||
'ethertype': 'IPv4',
|
||||
'ethertype': const.IPv4,
|
||||
'remote_group_id': None,
|
||||
'remote_ip_prefix': None,
|
||||
'protocol': None,
|
||||
@ -550,15 +552,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
self._assert_sg_rule_has_kvs(v4_egress, expected)
|
||||
|
||||
# Verify default rule for v6 egress
|
||||
rules = filter(
|
||||
lambda x: (
|
||||
x['direction'] == 'egress' and x['ethertype'] == 'IPv6'),
|
||||
sg_rules)
|
||||
rules = [
|
||||
r for r in sg_rules
|
||||
if r['direction'] == 'egress' and r['ethertype'] == const.IPv6
|
||||
]
|
||||
self.assertEqual(len(rules), 1)
|
||||
v6_egress = rules[0]
|
||||
|
||||
expected = {'direction': 'egress',
|
||||
'ethertype': 'IPv6',
|
||||
'ethertype': const.IPv6,
|
||||
'remote_group_id': None,
|
||||
'remote_ip_prefix': None,
|
||||
'protocol': None,
|
||||
@ -567,15 +569,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
self._assert_sg_rule_has_kvs(v6_egress, expected)
|
||||
|
||||
# Verify default rule for v4 ingress
|
||||
rules = filter(
|
||||
lambda x: (
|
||||
x['direction'] == 'ingress' and x['ethertype'] == 'IPv4'),
|
||||
sg_rules)
|
||||
rules = [
|
||||
r for r in sg_rules
|
||||
if r['direction'] == 'ingress' and r['ethertype'] == const.IPv4
|
||||
]
|
||||
self.assertEqual(len(rules), 1)
|
||||
v4_ingress = rules[0]
|
||||
|
||||
expected = {'direction': 'ingress',
|
||||
'ethertype': 'IPv4',
|
||||
'ethertype': const.IPv4,
|
||||
'remote_group_id': security_group_id,
|
||||
'remote_ip_prefix': None,
|
||||
'protocol': None,
|
||||
@ -584,15 +586,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
self._assert_sg_rule_has_kvs(v4_ingress, expected)
|
||||
|
||||
# Verify default rule for v6 ingress
|
||||
rules = filter(
|
||||
lambda x: (
|
||||
x['direction'] == 'ingress' and x['ethertype'] == 'IPv6'),
|
||||
sg_rules)
|
||||
rules = [
|
||||
r for r in sg_rules
|
||||
if r['direction'] == 'ingress' and r['ethertype'] == const.IPv6
|
||||
]
|
||||
self.assertEqual(len(rules), 1)
|
||||
v6_ingress = rules[0]
|
||||
|
||||
expected = {'direction': 'ingress',
|
||||
'ethertype': 'IPv6',
|
||||
'ethertype': const.IPv6,
|
||||
'remote_group_id': security_group_id,
|
||||
'remote_ip_prefix': None,
|
||||
'protocol': None,
|
||||
@ -607,7 +609,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
security_group_id = sg['security_group']['id']
|
||||
direction = "ingress"
|
||||
remote_ip_prefix = "10.0.0.0/24"
|
||||
protocol = 'tcp'
|
||||
protocol = const.PROTO_NAME_TCP
|
||||
port_range_min = 22
|
||||
port_range_max = 22
|
||||
keys = [('remote_ip_prefix', remote_ip_prefix),
|
||||
@ -631,7 +633,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
security_group_id = sg['security_group']['id']
|
||||
direction = "ingress"
|
||||
remote_group_id = sg2['security_group']['id']
|
||||
protocol = 'tcp'
|
||||
protocol = const.PROTO_NAME_TCP
|
||||
port_range_min = 22
|
||||
port_range_max = 22
|
||||
keys = [('remote_group_id', remote_group_id),
|
||||
@ -655,7 +657,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
security_group_id = sg['security_group']['id']
|
||||
direction = "ingress"
|
||||
remote_ip_prefix = "10.0.0.0/24"
|
||||
protocol = 'icmp'
|
||||
protocol = const.PROTO_NAME_ICMP
|
||||
# port_range_min (ICMP type) is greater than port_range_max
|
||||
# (ICMP code) in order to confirm min <= max port check is
|
||||
# not called for ICMP.
|
||||
@ -681,7 +683,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
security_group_id = sg['security_group']['id']
|
||||
direction = "ingress"
|
||||
remote_ip_prefix = "10.0.0.0/24"
|
||||
protocol = 'icmp'
|
||||
protocol = const.PROTO_NAME_ICMP
|
||||
# ICMP type
|
||||
port_range_min = 8
|
||||
# ICMP code
|
||||
@ -703,7 +705,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
|
||||
direction = "ingress"
|
||||
remote_ip_prefix = "10.0.0.0/24"
|
||||
protocol = 'tcp'
|
||||
protocol = const.PROTO_NAME_TCP
|
||||
port_range_min = 22
|
||||
port_range_max = 22
|
||||
remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
|
||||
@ -714,13 +716,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
remote_group_id)
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
|
||||
def test_create_security_group_rule_bad_security_group_id(self):
|
||||
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
|
||||
direction = "ingress"
|
||||
remote_ip_prefix = "10.0.0.0/24"
|
||||
protocol = 'tcp'
|
||||
protocol = const.PROTO_NAME_TCP
|
||||
port_range_min = 22
|
||||
port_range_max = 22
|
||||
rule = self._build_security_group_rule(security_group_id, direction,
|
||||
@ -729,21 +731,21 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
remote_ip_prefix)
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 404)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
|
||||
|
||||
def test_create_security_group_rule_bad_tenant(self):
|
||||
with self.security_group() as sg:
|
||||
rule = {'security_group_rule':
|
||||
{'security_group_id': sg['security_group']['id'],
|
||||
'direction': 'ingress',
|
||||
'protocol': 'tcp',
|
||||
'protocol': const.PROTO_NAME_TCP,
|
||||
'port_range_min': '22',
|
||||
'port_range_max': '22',
|
||||
'tenant_id': "bad_tenant"}}
|
||||
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 404)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
|
||||
|
||||
def test_create_security_group_rule_bad_tenant_remote_group_id(self):
|
||||
with self.security_group() as sg:
|
||||
@ -754,7 +756,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
rule = {'security_group_rule':
|
||||
{'security_group_id': sg2['security_group']['id'],
|
||||
'direction': 'ingress',
|
||||
'protocol': 'tcp',
|
||||
'protocol': const.PROTO_NAME_TCP,
|
||||
'port_range_min': '22',
|
||||
'port_range_max': '22',
|
||||
'tenant_id': 'bad_tenant',
|
||||
@ -764,7 +766,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
tenant_id='bad_tenant',
|
||||
set_context=True)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 404)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
|
||||
|
||||
def test_create_security_group_rule_bad_tenant_security_group_rule(self):
|
||||
with self.security_group() as sg:
|
||||
@ -775,7 +777,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
rule = {'security_group_rule':
|
||||
{'security_group_id': sg['security_group']['id'],
|
||||
'direction': 'ingress',
|
||||
'protocol': 'tcp',
|
||||
'protocol': const.PROTO_NAME_TCP,
|
||||
'port_range_min': '22',
|
||||
'port_range_max': '22',
|
||||
'tenant_id': 'bad_tenant'}}
|
||||
@ -784,7 +786,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
tenant_id='bad_tenant',
|
||||
set_context=True)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 404)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
|
||||
|
||||
def test_create_security_group_rule_bad_remote_group_id(self):
|
||||
name = 'webservers'
|
||||
@ -793,7 +795,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
security_group_id = sg['security_group']['id']
|
||||
remote_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
|
||||
direction = "ingress"
|
||||
protocol = 'tcp'
|
||||
protocol = const.PROTO_NAME_TCP
|
||||
port_range_min = 22
|
||||
port_range_max = 22
|
||||
rule = self._build_security_group_rule(security_group_id, direction,
|
||||
@ -802,7 +804,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
remote_group_id=remote_group_id)
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 404)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
|
||||
|
||||
def test_create_security_group_rule_duplicate_rules(self):
|
||||
name = 'webservers'
|
||||
@ -811,11 +813,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
security_group_id = sg['security_group']['id']
|
||||
with self.security_group_rule(security_group_id):
|
||||
rule = self._build_security_group_rule(
|
||||
sg['security_group']['id'], 'ingress', 'tcp', '22', '22')
|
||||
sg['security_group']['id'], 'ingress',
|
||||
const.PROTO_NAME_TCP, '22', '22')
|
||||
self._create_security_group_rule(self.fmt, rule)
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 409)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
|
||||
|
||||
def test_create_security_group_rule_min_port_greater_max(self):
|
||||
name = 'webservers'
|
||||
@ -823,14 +826,16 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
with self.security_group(name, description) as sg:
|
||||
security_group_id = sg['security_group']['id']
|
||||
with self.security_group_rule(security_group_id):
|
||||
for protocol in ['tcp', 'udp', 6, 17]:
|
||||
for protocol in [const.PROTO_NAME_TCP, const.PROTO_NAME_UDP,
|
||||
const.PROTO_NUM_TCP, const.PROTO_NUM_UDP]:
|
||||
rule = self._build_security_group_rule(
|
||||
sg['security_group']['id'],
|
||||
'ingress', protocol, '50', '22')
|
||||
self._create_security_group_rule(self.fmt, rule)
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(res.status_int,
|
||||
webob.exc.HTTPBadRequest.code)
|
||||
|
||||
def test_create_security_group_rule_ports_but_no_protocol(self):
|
||||
name = 'webservers'
|
||||
@ -843,7 +848,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
self._create_security_group_rule(self.fmt, rule)
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
|
||||
def test_create_security_group_rule_port_range_min_only(self):
|
||||
name = 'webservers'
|
||||
@ -852,11 +857,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
security_group_id = sg['security_group']['id']
|
||||
with self.security_group_rule(security_group_id):
|
||||
rule = self._build_security_group_rule(
|
||||
sg['security_group']['id'], 'ingress', 'tcp', '22', None)
|
||||
sg['security_group']['id'], 'ingress',
|
||||
const.PROTO_NAME_TCP, '22', None)
|
||||
self._create_security_group_rule(self.fmt, rule)
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
|
||||
def test_create_security_group_rule_port_range_max_only(self):
|
||||
name = 'webservers'
|
||||
@ -865,11 +871,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
security_group_id = sg['security_group']['id']
|
||||
with self.security_group_rule(security_group_id):
|
||||
rule = self._build_security_group_rule(
|
||||
sg['security_group']['id'], 'ingress', 'tcp', None, '22')
|
||||
sg['security_group']['id'], 'ingress',
|
||||
const.PROTO_NAME_TCP, None, '22')
|
||||
self._create_security_group_rule(self.fmt, rule)
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
|
||||
def test_create_security_group_rule_icmp_type_too_big(self):
|
||||
name = 'webservers'
|
||||
@ -878,11 +885,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
security_group_id = sg['security_group']['id']
|
||||
with self.security_group_rule(security_group_id):
|
||||
rule = self._build_security_group_rule(
|
||||
sg['security_group']['id'], 'ingress', 'icmp', '256', None)
|
||||
sg['security_group']['id'], 'ingress',
|
||||
const.PROTO_NAME_ICMP, '256', None)
|
||||
self._create_security_group_rule(self.fmt, rule)
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
|
||||
def test_create_security_group_rule_icmp_code_too_big(self):
|
||||
name = 'webservers'
|
||||
@ -891,11 +899,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
security_group_id = sg['security_group']['id']
|
||||
with self.security_group_rule(security_group_id):
|
||||
rule = self._build_security_group_rule(
|
||||
sg['security_group']['id'], 'ingress', 'icmp', '8', '256')
|
||||
sg['security_group']['id'], 'ingress',
|
||||
const.PROTO_NAME_ICMP, '8', '256')
|
||||
self._create_security_group_rule(self.fmt, rule)
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
|
||||
def test_list_ports_security_group(self):
|
||||
with self.network() as n:
|
||||
@ -1107,7 +1116,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
security_groups=['bad_id'])
|
||||
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
|
||||
def test_create_delete_security_group_port_in_use(self):
|
||||
with self.network() as n:
|
||||
@ -1121,7 +1130,8 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
sg['security_group']['id'])
|
||||
# try to delete security group that's in use
|
||||
res = self._delete('security-groups',
|
||||
sg['security_group']['id'], 409)
|
||||
sg['security_group']['id'],
|
||||
webob.exc.HTTPConflict.code)
|
||||
# delete the blocking port
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
@ -1131,16 +1141,18 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
"security_group_rule create")
|
||||
with self.security_group() as sg:
|
||||
rule1 = self._build_security_group_rule(sg['security_group']['id'],
|
||||
'ingress', 'tcp', '22',
|
||||
'ingress',
|
||||
const.PROTO_NAME_TCP, '22',
|
||||
'22', '10.0.0.1/24')
|
||||
rule2 = self._build_security_group_rule(sg['security_group']['id'],
|
||||
'ingress', 'tcp', '23',
|
||||
'ingress',
|
||||
const.PROTO_NAME_TCP, '23',
|
||||
'23', '10.0.0.1/24')
|
||||
rules = {'security_group_rules': [rule1['security_group_rule'],
|
||||
rule2['security_group_rule']]}
|
||||
res = self._create_security_group_rule(self.fmt, rules)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 201)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
|
||||
|
||||
def test_create_security_group_rule_bulk_emulated(self):
|
||||
real_has_attr = hasattr
|
||||
@ -1155,17 +1167,17 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
new=fakehasattr):
|
||||
with self.security_group() as sg:
|
||||
rule1 = self._build_security_group_rule(
|
||||
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
|
||||
'10.0.0.1/24')
|
||||
sg['security_group']['id'], 'ingress',
|
||||
const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
|
||||
rule2 = self._build_security_group_rule(
|
||||
sg['security_group']['id'], 'ingress', 'tcp', '23', '23',
|
||||
'10.0.0.1/24')
|
||||
sg['security_group']['id'], 'ingress',
|
||||
const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24')
|
||||
rules = {'security_group_rules': [rule1['security_group_rule'],
|
||||
rule2['security_group_rule']]
|
||||
}
|
||||
res = self._create_security_group_rule(self.fmt, rules)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 201)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
|
||||
|
||||
def test_create_security_group_rule_duplicate_rule_in_post(self):
|
||||
if self._skip_native_bulk:
|
||||
@ -1173,13 +1185,14 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
"security_group_rule create")
|
||||
with self.security_group() as sg:
|
||||
rule = self._build_security_group_rule(sg['security_group']['id'],
|
||||
'ingress', 'tcp', '22',
|
||||
'ingress',
|
||||
const.PROTO_NAME_TCP, '22',
|
||||
'22', '10.0.0.1/24')
|
||||
rules = {'security_group_rules': [rule['security_group_rule'],
|
||||
rule['security_group_rule']]}
|
||||
res = self._create_security_group_rule(self.fmt, rules)
|
||||
rule = self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 409)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
|
||||
|
||||
def test_create_security_group_rule_duplicate_rule_in_post_emulated(self):
|
||||
real_has_attr = hasattr
|
||||
@ -1195,13 +1208,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
|
||||
with self.security_group() as sg:
|
||||
rule = self._build_security_group_rule(
|
||||
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
|
||||
'10.0.0.1/24')
|
||||
sg['security_group']['id'], 'ingress',
|
||||
const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
|
||||
rules = {'security_group_rules': [rule['security_group_rule'],
|
||||
rule['security_group_rule']]}
|
||||
res = self._create_security_group_rule(self.fmt, rules)
|
||||
rule = self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 409)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
|
||||
|
||||
def test_create_security_group_rule_duplicate_rule_db(self):
|
||||
if self._skip_native_bulk:
|
||||
@ -1209,13 +1222,14 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
"security_group_rule create")
|
||||
with self.security_group() as sg:
|
||||
rule = self._build_security_group_rule(sg['security_group']['id'],
|
||||
'ingress', 'tcp', '22',
|
||||
'ingress',
|
||||
const.PROTO_NAME_TCP, '22',
|
||||
'22', '10.0.0.1/24')
|
||||
rules = {'security_group_rules': [rule]}
|
||||
self._create_security_group_rule(self.fmt, rules)
|
||||
res = self._create_security_group_rule(self.fmt, rules)
|
||||
rule = self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 409)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
|
||||
|
||||
def test_create_security_group_rule_duplicate_rule_db_emulated(self):
|
||||
real_has_attr = hasattr
|
||||
@ -1230,13 +1244,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
new=fakehasattr):
|
||||
with self.security_group() as sg:
|
||||
rule = self._build_security_group_rule(
|
||||
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
|
||||
'10.0.0.1/24')
|
||||
sg['security_group']['id'], 'ingress',
|
||||
const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
|
||||
rules = {'security_group_rules': [rule]}
|
||||
self._create_security_group_rule(self.fmt, rules)
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 409)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
|
||||
|
||||
def test_create_security_group_rule_differnt_security_group_ids(self):
|
||||
if self._skip_native_bulk:
|
||||
@ -1245,24 +1259,24 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
with self.security_group() as sg1:
|
||||
with self.security_group() as sg2:
|
||||
rule1 = self._build_security_group_rule(
|
||||
sg1['security_group']['id'], 'ingress', 'tcp', '22', '22',
|
||||
'10.0.0.1/24')
|
||||
sg1['security_group']['id'], 'ingress',
|
||||
const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
|
||||
rule2 = self._build_security_group_rule(
|
||||
sg2['security_group']['id'], 'ingress', 'tcp', '23', '23',
|
||||
'10.0.0.1/24')
|
||||
sg2['security_group']['id'], 'ingress',
|
||||
const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24')
|
||||
|
||||
rules = {'security_group_rules': [rule1['security_group_rule'],
|
||||
rule2['security_group_rule']]
|
||||
}
|
||||
res = self._create_security_group_rule(self.fmt, rules)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
|
||||
def test_create_security_group_rule_with_invalid_ethertype(self):
|
||||
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
|
||||
direction = "ingress"
|
||||
remote_ip_prefix = "10.0.0.0/24"
|
||||
protocol = 'tcp'
|
||||
protocol = const.PROTO_NAME_TCP
|
||||
port_range_min = 22
|
||||
port_range_max = 22
|
||||
remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
|
||||
@ -1274,7 +1288,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
ethertype='IPv5')
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
|
||||
def test_create_security_group_rule_with_invalid_protocol(self):
|
||||
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
|
||||
@ -1291,7 +1305,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
remote_group_id)
|
||||
res = self._create_security_group_rule(self.fmt, rule)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
|
||||
def test_create_port_with_non_uuid(self):
|
||||
with self.network() as n:
|
||||
@ -1300,7 +1314,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
|
||||
security_groups=['not_valid'])
|
||||
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 400)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
|
||||
|
||||
|
||||
class TestSecurityGroupsXML(TestSecurityGroups):
|
||||
|
@ -21,11 +21,13 @@ import mock
|
||||
from mock import call
|
||||
import mox
|
||||
from oslo.config import cfg
|
||||
import webob.exc
|
||||
|
||||
from neutron.agent import firewall as firewall_base
|
||||
from neutron.agent.linux import iptables_manager
|
||||
from neutron.agent import rpc as agent_rpc
|
||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.common import constants as const
|
||||
from neutron import context
|
||||
from neutron.db import securitygroups_rpc_base as sg_db_rpc
|
||||
from neutron.extensions import allowedaddresspairs as addr_pair
|
||||
@ -54,7 +56,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
self.rpc = FakeSGCallback()
|
||||
|
||||
def test_security_group_rules_for_devices_ipv4_ingress(self):
|
||||
fake_prefix = test_fw.FAKE_PREFIX['IPv4']
|
||||
fake_prefix = test_fw.FAKE_PREFIX[const.IPv4]
|
||||
with self.network() as n:
|
||||
with nested(self.subnet(n),
|
||||
self.security_group()) as (subnet_v4,
|
||||
@ -62,18 +64,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
sg1_id = sg1['security_group']['id']
|
||||
rule1 = self._build_security_group_rule(
|
||||
sg1_id,
|
||||
'ingress', 'tcp', '22',
|
||||
'ingress', const.PROTO_NAME_TCP, '22',
|
||||
'22')
|
||||
rule2 = self._build_security_group_rule(
|
||||
sg1_id,
|
||||
'ingress', 'tcp', '23',
|
||||
'ingress', const.PROTO_NAME_TCP, '23',
|
||||
'23', fake_prefix)
|
||||
rules = {
|
||||
'security_group_rules': [rule1['security_group_rule'],
|
||||
rule2['security_group_rule']]}
|
||||
res = self._create_security_group_rule(self.fmt, rules)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 201)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
|
||||
|
||||
res1 = self._create_port(
|
||||
self.fmt, n['network']['id'],
|
||||
@ -86,17 +88,19 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
ports_rpc = self.rpc.security_group_rules_for_devices(
|
||||
ctx, devices=devices)
|
||||
port_rpc = ports_rpc[port_id1]
|
||||
expected = [{'direction': 'egress', 'ethertype': 'IPv4',
|
||||
expected = [{'direction': 'egress', 'ethertype': const.IPv4,
|
||||
'security_group_id': sg1_id},
|
||||
{'direction': 'egress', 'ethertype': 'IPv6',
|
||||
{'direction': 'egress', 'ethertype': const.IPv6,
|
||||
'security_group_id': sg1_id},
|
||||
{'direction': 'ingress',
|
||||
'protocol': 'tcp', 'ethertype': 'IPv4',
|
||||
'protocol': const.PROTO_NAME_TCP,
|
||||
'ethertype': const.IPv4,
|
||||
'port_range_max': 22,
|
||||
'security_group_id': sg1_id,
|
||||
'port_range_min': 22},
|
||||
{'direction': 'ingress', 'protocol': 'tcp',
|
||||
'ethertype': 'IPv4',
|
||||
{'direction': 'ingress',
|
||||
'protocol': const.PROTO_NAME_TCP,
|
||||
'ethertype': const.IPv4,
|
||||
'port_range_max': 23, 'security_group_id': sg1_id,
|
||||
'port_range_min': 23,
|
||||
'source_ip_prefix': fake_prefix},
|
||||
@ -169,7 +173,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
self._delete('ports', port_id1)
|
||||
|
||||
def test_security_group_rules_for_devices_ipv4_egress(self):
|
||||
fake_prefix = test_fw.FAKE_PREFIX['IPv4']
|
||||
fake_prefix = test_fw.FAKE_PREFIX[const.IPv4]
|
||||
with self.network() as n:
|
||||
with nested(self.subnet(n),
|
||||
self.security_group()) as (subnet_v4,
|
||||
@ -177,18 +181,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
sg1_id = sg1['security_group']['id']
|
||||
rule1 = self._build_security_group_rule(
|
||||
sg1_id,
|
||||
'egress', 'tcp', '22',
|
||||
'egress', const.PROTO_NAME_TCP, '22',
|
||||
'22')
|
||||
rule2 = self._build_security_group_rule(
|
||||
sg1_id,
|
||||
'egress', 'udp', '23',
|
||||
'egress', const.PROTO_NAME_UDP, '23',
|
||||
'23', fake_prefix)
|
||||
rules = {
|
||||
'security_group_rules': [rule1['security_group_rule'],
|
||||
rule2['security_group_rule']]}
|
||||
res = self._create_security_group_rule(self.fmt, rules)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 201)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
|
||||
|
||||
res1 = self._create_port(
|
||||
self.fmt, n['network']['id'],
|
||||
@ -201,17 +205,19 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
ports_rpc = self.rpc.security_group_rules_for_devices(
|
||||
ctx, devices=devices)
|
||||
port_rpc = ports_rpc[port_id1]
|
||||
expected = [{'direction': 'egress', 'ethertype': 'IPv4',
|
||||
expected = [{'direction': 'egress', 'ethertype': const.IPv4,
|
||||
'security_group_id': sg1_id},
|
||||
{'direction': 'egress', 'ethertype': 'IPv6',
|
||||
{'direction': 'egress', 'ethertype': const.IPv6,
|
||||
'security_group_id': sg1_id},
|
||||
{'direction': 'egress',
|
||||
'protocol': 'tcp', 'ethertype': 'IPv4',
|
||||
'protocol': const.PROTO_NAME_TCP,
|
||||
'ethertype': const.IPv4,
|
||||
'port_range_max': 22,
|
||||
'security_group_id': sg1_id,
|
||||
'port_range_min': 22},
|
||||
{'direction': 'egress', 'protocol': 'udp',
|
||||
'ethertype': 'IPv4',
|
||||
{'direction': 'egress',
|
||||
'protocol': const.PROTO_NAME_UDP,
|
||||
'ethertype': const.IPv4,
|
||||
'port_range_max': 23, 'security_group_id': sg1_id,
|
||||
'port_range_min': 23,
|
||||
'dest_ip_prefix': fake_prefix},
|
||||
@ -232,13 +238,13 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
sg2_id = sg2['security_group']['id']
|
||||
rule1 = self._build_security_group_rule(
|
||||
sg1_id,
|
||||
'ingress', 'tcp', '24',
|
||||
'ingress', const.PROTO_NAME_TCP, '24',
|
||||
'25', remote_group_id=sg2['security_group']['id'])
|
||||
rules = {
|
||||
'security_group_rules': [rule1['security_group_rule']]}
|
||||
res = self._create_security_group_rule(self.fmt, rules)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 201)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
|
||||
|
||||
res1 = self._create_port(
|
||||
self.fmt, n['network']['id'],
|
||||
@ -258,17 +264,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
ports_rpc = self.rpc.security_group_rules_for_devices(
|
||||
ctx, devices=devices)
|
||||
port_rpc = ports_rpc[port_id1]
|
||||
expected = [{'direction': 'egress', 'ethertype': 'IPv4',
|
||||
expected = [{'direction': 'egress', 'ethertype': const.IPv4,
|
||||
'security_group_id': sg1_id},
|
||||
{'direction': 'egress', 'ethertype': 'IPv6',
|
||||
{'direction': 'egress', 'ethertype': const.IPv6,
|
||||
'security_group_id': sg1_id},
|
||||
{'direction': 'egress', 'ethertype': 'IPv4',
|
||||
{'direction': 'egress', 'ethertype': const.IPv4,
|
||||
'security_group_id': sg2_id},
|
||||
{'direction': 'egress', 'ethertype': 'IPv6',
|
||||
{'direction': 'egress', 'ethertype': const.IPv6,
|
||||
'security_group_id': sg2_id},
|
||||
{'direction': u'ingress',
|
||||
'source_ip_prefix': u'10.0.0.3/32',
|
||||
'protocol': u'tcp', 'ethertype': u'IPv4',
|
||||
'protocol': const.PROTO_NAME_TCP,
|
||||
'ethertype': const.IPv4,
|
||||
'port_range_max': 25, 'port_range_min': 24,
|
||||
'remote_group_id': sg2_id,
|
||||
'security_group_id': sg1_id},
|
||||
@ -279,7 +286,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
self._delete('ports', port_id2)
|
||||
|
||||
def test_security_group_rules_for_devices_ipv6_ingress(self):
|
||||
fake_prefix = test_fw.FAKE_PREFIX['IPv6']
|
||||
fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
|
||||
with self.network() as n:
|
||||
with nested(self.subnet(n,
|
||||
cidr=fake_prefix,
|
||||
@ -289,20 +296,20 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
sg1_id = sg1['security_group']['id']
|
||||
rule1 = self._build_security_group_rule(
|
||||
sg1_id,
|
||||
'ingress', 'tcp', '22',
|
||||
'ingress', const.PROTO_NAME_TCP, '22',
|
||||
'22',
|
||||
ethertype='IPv6')
|
||||
ethertype=const.IPv6)
|
||||
rule2 = self._build_security_group_rule(
|
||||
sg1_id,
|
||||
'ingress', 'udp', '23',
|
||||
'ingress', const.PROTO_NAME_UDP, '23',
|
||||
'23', fake_prefix,
|
||||
ethertype='IPv6')
|
||||
ethertype=const.IPv6)
|
||||
rules = {
|
||||
'security_group_rules': [rule1['security_group_rule'],
|
||||
rule2['security_group_rule']]}
|
||||
res = self._create_security_group_rule(self.fmt, rules)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 201)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
|
||||
|
||||
res1 = self._create_port(
|
||||
self.fmt, n['network']['id'],
|
||||
@ -316,17 +323,19 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
ports_rpc = self.rpc.security_group_rules_for_devices(
|
||||
ctx, devices=devices)
|
||||
port_rpc = ports_rpc[port_id1]
|
||||
expected = [{'direction': 'egress', 'ethertype': 'IPv4',
|
||||
expected = [{'direction': 'egress', 'ethertype': const.IPv4,
|
||||
'security_group_id': sg1_id},
|
||||
{'direction': 'egress', 'ethertype': 'IPv6',
|
||||
{'direction': 'egress', 'ethertype': const.IPv6,
|
||||
'security_group_id': sg1_id},
|
||||
{'direction': 'ingress',
|
||||
'protocol': 'tcp', 'ethertype': 'IPv6',
|
||||
'protocol': const.PROTO_NAME_TCP,
|
||||
'ethertype': const.IPv6,
|
||||
'port_range_max': 22,
|
||||
'security_group_id': sg1_id,
|
||||
'port_range_min': 22},
|
||||
{'direction': 'ingress', 'protocol': 'udp',
|
||||
'ethertype': 'IPv6',
|
||||
{'direction': 'ingress',
|
||||
'protocol': const.PROTO_NAME_UDP,
|
||||
'ethertype': const.IPv6,
|
||||
'port_range_max': 23, 'security_group_id': sg1_id,
|
||||
'port_range_min': 23,
|
||||
'source_ip_prefix': fake_prefix},
|
||||
@ -336,7 +345,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
self._delete('ports', port_id1)
|
||||
|
||||
def test_security_group_rules_for_devices_ipv6_egress(self):
|
||||
fake_prefix = test_fw.FAKE_PREFIX['IPv6']
|
||||
fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
|
||||
with self.network() as n:
|
||||
with nested(self.subnet(n,
|
||||
cidr=fake_prefix,
|
||||
@ -346,20 +355,20 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
sg1_id = sg1['security_group']['id']
|
||||
rule1 = self._build_security_group_rule(
|
||||
sg1_id,
|
||||
'egress', 'tcp', '22',
|
||||
'egress', const.PROTO_NAME_TCP, '22',
|
||||
'22',
|
||||
ethertype='IPv6')
|
||||
ethertype=const.IPv6)
|
||||
rule2 = self._build_security_group_rule(
|
||||
sg1_id,
|
||||
'egress', 'udp', '23',
|
||||
'egress', const.PROTO_NAME_UDP, '23',
|
||||
'23', fake_prefix,
|
||||
ethertype='IPv6')
|
||||
ethertype=const.IPv6)
|
||||
rules = {
|
||||
'security_group_rules': [rule1['security_group_rule'],
|
||||
rule2['security_group_rule']]}
|
||||
res = self._create_security_group_rule(self.fmt, rules)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 201)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
|
||||
|
||||
res1 = self._create_port(
|
||||
self.fmt, n['network']['id'],
|
||||
@ -374,18 +383,21 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
ports_rpc = self.rpc.security_group_rules_for_devices(
|
||||
ctx, devices=devices)
|
||||
port_rpc = ports_rpc[port_id1]
|
||||
expected = [{'direction': 'egress', 'ethertype': 'IPv4',
|
||||
expected = [{'direction': 'egress', 'ethertype': const.IPv4,
|
||||
'security_group_id': sg1_id},
|
||||
{'direction': 'egress', 'ethertype': 'IPv6',
|
||||
{'direction': 'egress', 'ethertype': const.IPv6,
|
||||
'security_group_id': sg1_id},
|
||||
{'direction': 'egress',
|
||||
'protocol': 'tcp', 'ethertype': 'IPv6',
|
||||
'protocol': const.PROTO_NAME_TCP,
|
||||
'ethertype': const.IPv6,
|
||||
'port_range_max': 22,
|
||||
'security_group_id': sg1_id,
|
||||
'port_range_min': 22},
|
||||
{'direction': 'egress', 'protocol': 'udp',
|
||||
'ethertype': 'IPv6',
|
||||
'port_range_max': 23, 'security_group_id': sg1_id,
|
||||
{'direction': 'egress',
|
||||
'protocol': const.PROTO_NAME_UDP,
|
||||
'ethertype': const.IPv6,
|
||||
'port_range_max': 23,
|
||||
'security_group_id': sg1_id,
|
||||
'port_range_min': 23,
|
||||
'dest_ip_prefix': fake_prefix},
|
||||
]
|
||||
@ -394,7 +406,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
self._delete('ports', port_id1)
|
||||
|
||||
def test_security_group_rules_for_devices_ipv6_source_group(self):
|
||||
fake_prefix = test_fw.FAKE_PREFIX['IPv6']
|
||||
fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
|
||||
with self.network() as n:
|
||||
with nested(self.subnet(n,
|
||||
cidr=fake_prefix,
|
||||
@ -407,15 +419,15 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
sg2_id = sg2['security_group']['id']
|
||||
rule1 = self._build_security_group_rule(
|
||||
sg1_id,
|
||||
'ingress', 'tcp', '24',
|
||||
'ingress', const.PROTO_NAME_TCP, '24',
|
||||
'25',
|
||||
ethertype='IPv6',
|
||||
ethertype=const.IPv6,
|
||||
remote_group_id=sg2['security_group']['id'])
|
||||
rules = {
|
||||
'security_group_rules': [rule1['security_group_rule']]}
|
||||
res = self._create_security_group_rule(self.fmt, rules)
|
||||
self.deserialize(self.fmt, res)
|
||||
self.assertEqual(res.status_int, 201)
|
||||
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
|
||||
|
||||
res1 = self._create_port(
|
||||
self.fmt, n['network']['id'],
|
||||
@ -438,17 +450,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
ports_rpc = self.rpc.security_group_rules_for_devices(
|
||||
ctx, devices=devices)
|
||||
port_rpc = ports_rpc[port_id1]
|
||||
expected = [{'direction': 'egress', 'ethertype': 'IPv4',
|
||||
expected = [{'direction': 'egress', 'ethertype': const.IPv4,
|
||||
'security_group_id': sg1_id},
|
||||
{'direction': 'egress', 'ethertype': 'IPv6',
|
||||
{'direction': 'egress', 'ethertype': const.IPv6,
|
||||
'security_group_id': sg1_id},
|
||||
{'direction': 'egress', 'ethertype': 'IPv4',
|
||||
{'direction': 'egress', 'ethertype': const.IPv4,
|
||||
'security_group_id': sg2_id},
|
||||
{'direction': 'egress', 'ethertype': 'IPv6',
|
||||
{'direction': 'egress', 'ethertype': const.IPv6,
|
||||
'security_group_id': sg2_id},
|
||||
{'direction': 'ingress',
|
||||
'source_ip_prefix': 'fe80::3/128',
|
||||
'protocol': 'tcp', 'ethertype': 'IPv6',
|
||||
'protocol': const.PROTO_NAME_TCP,
|
||||
'ethertype': const.IPv6,
|
||||
'port_range_max': 25, 'port_range_min': 24,
|
||||
'remote_group_id': sg2_id,
|
||||
'security_group_id': sg1_id},
|
||||
@ -1220,36 +1233,36 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
|
||||
self.rpc = mock.Mock()
|
||||
self.agent.plugin_rpc = self.rpc
|
||||
rule1 = [{'direction': 'ingress',
|
||||
'protocol': 'udp',
|
||||
'ethertype': 'IPv4',
|
||||
'protocol': const.PROTO_NAME_UDP,
|
||||
'ethertype': const.IPv4,
|
||||
'source_ip_prefix': '10.0.0.2',
|
||||
'source_port_range_min': 67,
|
||||
'source_port_range_max': 67,
|
||||
'port_range_min': 68,
|
||||
'port_range_max': 68},
|
||||
{'direction': 'ingress',
|
||||
'protocol': 'tcp',
|
||||
'ethertype': 'IPv4',
|
||||
'protocol': const.PROTO_NAME_TCP,
|
||||
'ethertype': const.IPv4,
|
||||
'port_range_min': 22,
|
||||
'port_range_max': 22},
|
||||
{'direction': 'egress',
|
||||
'ethertype': 'IPv4'}]
|
||||
'ethertype': const.IPv4}]
|
||||
rule2 = rule1[:]
|
||||
rule2 += [{'direction': 'ingress',
|
||||
'source_ip_prefix': '10.0.0.4',
|
||||
'ethertype': 'IPv4'}]
|
||||
'ethertype': const.IPv4}]
|
||||
rule3 = rule2[:]
|
||||
rule3 += [{'direction': 'ingress',
|
||||
'protocol': 'icmp',
|
||||
'ethertype': 'IPv4'}]
|
||||
'protocol': const.PROTO_NAME_ICMP,
|
||||
'ethertype': const.IPv4}]
|
||||
rule4 = rule1[:]
|
||||
rule4 += [{'direction': 'ingress',
|
||||
'source_ip_prefix': '10.0.0.3',
|
||||
'ethertype': 'IPv4'}]
|
||||
'ethertype': const.IPv4}]
|
||||
rule5 = rule4[:]
|
||||
rule5 += [{'direction': 'ingress',
|
||||
'protocol': 'icmp',
|
||||
'ethertype': 'IPv4'}]
|
||||
'protocol': const.PROTO_NAME_ICMP,
|
||||
'ethertype': const.IPv4}]
|
||||
self.devices1 = {'tap_port1': self._device('tap_port1',
|
||||
'10.0.0.3',
|
||||
'12:34:56:78:9a:bc',
|
||||
@ -1360,7 +1373,7 @@ class SGNotificationTestMixin():
|
||||
security_group_id = sg['security_group']['id']
|
||||
direction = "ingress"
|
||||
remote_group_id = sg2['security_group']['id']
|
||||
protocol = 'tcp'
|
||||
protocol = const.PROTO_NAME_TCP
|
||||
port_range_min = 88
|
||||
port_range_max = 88
|
||||
with self.security_group_rule(security_group_id, direction,
|
||||
|
Loading…
Reference in New Issue
Block a user