# Copyright (c) 2012 OpenStack Foundation. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. import contextlib import os import mock import webob.exc from neutron.api.v2 import attributes as attr from neutron.common import constants as const from neutron import context from neutron.db import db_base_plugin_v2 from neutron.db import securitygroups_db from neutron.extensions import securitygroup as ext_sg from neutron.tests.unit import test_db_plugin DB_PLUGIN_KLASS = ('neutron.tests.unit.test_extension_security_group.' 'SecurityGroupTestPlugin') ROOTDIR = os.path.dirname(os.path.dirname(__file__)) ETCDIR = os.path.join(ROOTDIR, 'etc') def etcdir(*p): return os.path.join(ETCDIR, *p) class SecurityGroupTestExtensionManager(object): def get_resources(self): # Add the resources to the global attribute map # This is done here as the setup process won't # initialize the main API router which extends # the global attribute map attr.RESOURCE_ATTRIBUTE_MAP.update( ext_sg.RESOURCE_ATTRIBUTE_MAP) return ext_sg.Securitygroup.get_resources() def get_actions(self): return [] def get_request_extensions(self): return [] class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase): def _create_security_group(self, fmt, name, description, **kwargs): data = {'security_group': {'name': name, 'tenant_id': kwargs.get('tenant_id', 'test_tenant'), 'description': description}} security_group_req = self.new_create_request('security-groups', data, fmt) if (kwargs.get('set_context') and 'tenant_id' in kwargs): # create a specific auth context for this request security_group_req.environ['neutron.context'] = ( context.Context('', kwargs['tenant_id'])) return security_group_req.get_response(self.ext_api) def _build_security_group_rule(self, security_group_id, direction, proto, port_range_min=None, port_range_max=None, remote_ip_prefix=None, remote_group_id=None, tenant_id='test_tenant', ethertype=const.IPv4): data = {'security_group_rule': {'security_group_id': security_group_id, 'direction': direction, 'protocol': proto, 'ethertype': ethertype, 'tenant_id': tenant_id, 'ethertype': ethertype}} if port_range_min: data['security_group_rule']['port_range_min'] = port_range_min if port_range_max: data['security_group_rule']['port_range_max'] = port_range_max if remote_ip_prefix: data['security_group_rule']['remote_ip_prefix'] = remote_ip_prefix if remote_group_id: data['security_group_rule']['remote_group_id'] = remote_group_id return data def _create_security_group_rule(self, fmt, rules, **kwargs): security_group_rule_req = self.new_create_request( 'security-group-rules', rules, fmt) if (kwargs.get('set_context') and 'tenant_id' in kwargs): # create a specific auth context for this request security_group_rule_req.environ['neutron.context'] = ( context.Context('', kwargs['tenant_id'])) return security_group_rule_req.get_response(self.ext_api) def _make_security_group(self, fmt, name, description, **kwargs): res = self._create_security_group(fmt, name, description, **kwargs) if res.status_int >= webob.exc.HTTPBadRequest.code: raise webob.exc.HTTPClientError(code=res.status_int) return self.deserialize(fmt, res) def _make_security_group_rule(self, fmt, rules, **kwargs): res = self._create_security_group_rule(self.fmt, rules) if res.status_int >= webob.exc.HTTPBadRequest.code: raise webob.exc.HTTPClientError(code=res.status_int) return self.deserialize(fmt, res) @contextlib.contextmanager def security_group(self, name='webservers', description='webservers', fmt=None, no_delete=False): if not fmt: fmt = self.fmt security_group = self._make_security_group(fmt, name, description) try: yield security_group finally: if not no_delete: self._delete('security-groups', security_group['security_group']['id']) @contextlib.contextmanager def security_group_rule(self, security_group_id='4cd70774-cc67-4a87-9b39-7' 'd1db38eb087', direction='ingress', protocol=const.PROTO_NAME_TCP, port_range_min='22', port_range_max='22', remote_ip_prefix=None, remote_group_id=None, fmt=None, no_delete=False, ethertype=const.IPv4): if not fmt: fmt = self.fmt rule = self._build_security_group_rule(security_group_id, direction, protocol, port_range_min, port_range_max, remote_ip_prefix, remote_group_id, ethertype=ethertype) security_group_rule = self._make_security_group_rule(self.fmt, rule) try: yield security_group_rule finally: if not no_delete: self._delete('security-group-rules', security_group_rule['security_group_rule']['id']) def _delete_default_security_group_egress_rules(self, security_group_id): """Deletes default egress rules given a security group ID.""" res = self._list( 'security-group-rules', query_params='security_group_id=%s' % security_group_id) for r in res['security_group_rules']: if (r['direction'] == 'egress' and not r['port_range_max'] and not r['port_range_min'] and not r['protocol'] and not r['remote_ip_prefix']): self._delete('security-group-rules', r['id']) def _assert_sg_rule_has_kvs(self, security_group_rule, expected_kvs): """Asserts that the sg rule has expected key/value pairs passed in as expected_kvs dictionary """ for k, v in expected_kvs.iteritems(): self.assertEqual(security_group_rule[k], v) class SecurityGroupsTestCaseXML(SecurityGroupsTestCase): fmt = 'xml' class SecurityGroupTestPlugin(db_base_plugin_v2.NeutronDbPluginV2, securitygroups_db.SecurityGroupDbMixin): """Test plugin that implements necessary calls on create/delete port for associating ports with security groups. """ __native_pagination_support = True __native_sorting_support = True supported_extension_aliases = ["security-group"] def create_port(self, context, port): tenant_id = self._get_tenant_id_for_create(context, port['port']) default_sg = self._ensure_default_security_group(context, tenant_id) if not attr.is_attr_set(port['port'].get(ext_sg.SECURITYGROUPS)): port['port'][ext_sg.SECURITYGROUPS] = [default_sg] session = context.session with session.begin(subtransactions=True): sgids = self._get_security_groups_on_port(context, port) port = super(SecurityGroupTestPlugin, self).create_port(context, port) self._process_port_create_security_group(context, port, sgids) return port def update_port(self, context, id, port): session = context.session with session.begin(subtransactions=True): if ext_sg.SECURITYGROUPS in port['port']: port['port'][ext_sg.SECURITYGROUPS] = ( self._get_security_groups_on_port(context, port)) # delete the port binding and read it with the new rules self._delete_port_security_group_bindings(context, id) port['port']['id'] = id self._process_port_create_security_group( context, port['port'], port['port'].get(ext_sg.SECURITYGROUPS)) port = super(SecurityGroupTestPlugin, self).update_port( context, id, port) return port def create_network(self, context, network): tenant_id = self._get_tenant_id_for_create(context, network['network']) self._ensure_default_security_group(context, tenant_id) return super(SecurityGroupTestPlugin, self).create_network(context, network) def get_ports(self, context, filters=None, fields=None, sorts=[], limit=None, marker=None, page_reverse=False): neutron_lports = super(SecurityGroupTestPlugin, self).get_ports( context, filters, sorts=sorts, limit=limit, marker=marker, page_reverse=page_reverse) return neutron_lports class SecurityGroupDBTestCase(SecurityGroupsTestCase): def setUp(self, plugin=None, ext_mgr=None): plugin = plugin or DB_PLUGIN_KLASS ext_mgr = ext_mgr or SecurityGroupTestExtensionManager() super(SecurityGroupDBTestCase, self).setUp(plugin=plugin, ext_mgr=ext_mgr) class TestSecurityGroups(SecurityGroupDBTestCase): def test_create_security_group(self): name = 'webservers' description = 'my webservers' keys = [('name', name,), ('description', description)] with self.security_group(name, description) as security_group: for k, v, in keys: self.assertEqual(security_group['security_group'][k], v) # Verify that default egress rules have been created sg_rules = security_group['security_group']['security_group_rules'] self.assertEqual(len(sg_rules), 2) v4_rules = [r for r in sg_rules if r['ethertype'] == const.IPv4] self.assertEqual(len(v4_rules), 1) v4_rule = v4_rules[0] expected = {'direction': 'egress', 'ethertype': const.IPv4, 'remote_group_id': None, 'remote_ip_prefix': None, 'protocol': None, 'port_range_max': None, 'port_range_min': None} self._assert_sg_rule_has_kvs(v4_rule, expected) v6_rules = [r for r in sg_rules if r['ethertype'] == const.IPv6] self.assertEqual(len(v6_rules), 1) v6_rule = v6_rules[0] expected = {'direction': 'egress', 'ethertype': const.IPv6, 'remote_group_id': None, 'remote_ip_prefix': None, 'protocol': None, 'port_range_max': None, 'port_range_min': None} self._assert_sg_rule_has_kvs(v6_rule, expected) def test_update_security_group(self): with self.security_group() as sg: data = {'security_group': {'name': 'new_name', 'description': 'new_desc'}} req = self.new_update_request('security-groups', data, sg['security_group']['id']) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) self.assertEqual(res['security_group']['name'], data['security_group']['name']) self.assertEqual(res['security_group']['description'], data['security_group']['description']) def test_update_security_group_name_to_default_fail(self): with self.security_group() as sg: data = {'security_group': {'name': 'default', 'description': 'new_desc'}} req = self.new_update_request('security-groups', data, sg['security_group']['id']) req.environ['neutron.context'] = context.Context('', 'somebody') res = req.get_response(self.ext_api) self.assertEqual(res.status_int, webob.exc.HTTPConflict.code) def test_update_default_security_group_name_fail(self): with self.network(): res = self.new_list_request('security-groups') sg = self.deserialize(self.fmt, res.get_response(self.ext_api)) data = {'security_group': {'name': 'new_name', 'description': 'new_desc'}} req = self.new_update_request('security-groups', data, sg['security_groups'][0]['id']) req.environ['neutron.context'] = context.Context('', 'somebody') res = req.get_response(self.ext_api) self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code) def test_update_default_security_group_with_description(self): with self.network(): res = self.new_list_request('security-groups') sg = self.deserialize(self.fmt, res.get_response(self.ext_api)) data = {'security_group': {'description': 'new_desc'}} req = self.new_update_request('security-groups', data, sg['security_groups'][0]['id']) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) self.assertEqual(res['security_group']['description'], data['security_group']['description']) def test_default_security_group(self): with self.network(): res = self.new_list_request('security-groups') groups = self.deserialize(self.fmt, res.get_response(self.ext_api)) self.assertEqual(len(groups['security_groups']), 1) def test_create_default_security_group_fail(self): name = 'default' description = 'my webservers' res = self._create_security_group(self.fmt, name, description) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPConflict.code) def test_list_security_groups(self): with contextlib.nested(self.security_group(name='sg1', description='sg'), self.security_group(name='sg2', description='sg'), self.security_group(name='sg3', description='sg') ) as security_groups: self._test_list_resources('security-group', security_groups, query_params='description=sg') def test_list_security_groups_with_sort(self): with contextlib.nested(self.security_group(name='sg1', description='sg'), self.security_group(name='sg2', description='sg'), self.security_group(name='sg3', description='sg') ) as (sg1, sg2, sg3): self._test_list_with_sort('security-group', (sg3, sg2, sg1), [('name', 'desc')], query_params='description=sg') def test_list_security_groups_with_pagination(self): with contextlib.nested(self.security_group(name='sg1', description='sg'), self.security_group(name='sg2', description='sg'), self.security_group(name='sg3', description='sg') ) as (sg1, sg2, sg3): self._test_list_with_pagination('security-group', (sg1, sg2, sg3), ('name', 'asc'), 2, 2, query_params='description=sg') def test_list_security_groups_with_pagination_reverse(self): with contextlib.nested(self.security_group(name='sg1', description='sg'), self.security_group(name='sg2', description='sg'), self.security_group(name='sg3', description='sg') ) as (sg1, sg2, sg3): self._test_list_with_pagination_reverse( 'security-group', (sg1, sg2, sg3), ('name', 'asc'), 2, 2, query_params='description=sg') def test_create_security_group_rule_ethertype_invalid_as_number(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: security_group_id = sg['security_group']['id'] ethertype = 2 rule = self._build_security_group_rule( security_group_id, 'ingress', const.PROTO_NAME_TCP, '22', '22', None, None, ethertype=ethertype) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_tcp_protocol_as_number(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: security_group_id = sg['security_group']['id'] protocol = const.PROTO_NUM_TCP # TCP rule = self._build_security_group_rule( security_group_id, 'ingress', protocol, '22', '22') res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) def test_create_security_group_rule_protocol_as_number(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: security_group_id = sg['security_group']['id'] protocol = 2 rule = self._build_security_group_rule( security_group_id, 'ingress', protocol) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) def test_create_security_group_rule_case_insensitive(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: security_group_id = sg['security_group']['id'] direction = "ingress" remote_ip_prefix = "10.0.0.0/24" protocol = 'TCP' port_range_min = 22 port_range_max = 22 ethertype = 'ipV4' with self.security_group_rule(security_group_id, direction, protocol, port_range_min, port_range_max, remote_ip_prefix, ethertype=ethertype) as rule: # the lower case value will be return self.assertEqual(rule['security_group_rule']['protocol'], protocol.lower()) self.assertEqual(rule['security_group_rule']['ethertype'], const.IPv4) def test_get_security_group(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: remote_group_id = sg['security_group']['id'] res = self.new_show_request('security-groups', remote_group_id) security_group_id = sg['security_group']['id'] direction = "ingress" remote_ip_prefix = "10.0.0.0/24" protocol = const.PROTO_NAME_TCP port_range_min = 22 port_range_max = 22 keys = [('remote_ip_prefix', remote_ip_prefix), ('security_group_id', security_group_id), ('direction', direction), ('protocol', protocol), ('port_range_min', port_range_min), ('port_range_max', port_range_max)] with self.security_group_rule(security_group_id, direction, protocol, port_range_min, port_range_max, remote_ip_prefix): group = self.deserialize( self.fmt, res.get_response(self.ext_api)) sg_rule = group['security_group']['security_group_rules'] self.assertEqual(group['security_group']['id'], remote_group_id) self.assertEqual(len(sg_rule), 3) sg_rule = [r for r in sg_rule if r['direction'] == 'ingress'] for k, v, in keys: self.assertEqual(sg_rule[0][k], v) def test_delete_security_group(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description, no_delete=True) as sg: remote_group_id = sg['security_group']['id'] self._delete('security-groups', remote_group_id, webob.exc.HTTPNoContent.code) def test_delete_default_security_group_admin(self): with self.network(): res = self.new_list_request('security-groups') sg = self.deserialize(self.fmt, res.get_response(self.ext_api)) self._delete('security-groups', sg['security_groups'][0]['id'], webob.exc.HTTPNoContent.code) def test_delete_default_security_group_nonadmin(self): with self.network(): res = self.new_list_request('security-groups') sg = self.deserialize(self.fmt, res.get_response(self.ext_api)) neutron_context = context.Context('', 'test-tenant') self._delete('security-groups', sg['security_groups'][0]['id'], webob.exc.HTTPConflict.code, neutron_context=neutron_context) def test_security_group_list_creates_default_security_group(self): neutron_context = context.Context('', 'test-tenant') sg = self._list('security-groups', neutron_context=neutron_context).get('security_groups') self.assertEqual(len(sg), 1) def test_default_security_group_rules(self): with self.network(): res = self.new_list_request('security-groups') groups = self.deserialize(self.fmt, res.get_response(self.ext_api)) self.assertEqual(len(groups['security_groups']), 1) security_group_id = groups['security_groups'][0]['id'] res = self.new_list_request('security-group-rules') rules = self.deserialize(self.fmt, res.get_response(self.ext_api)) self.assertEqual(len(rules['security_group_rules']), 4) # Verify default rule for v4 egress sg_rules = rules['security_group_rules'] rules = [ r for r in sg_rules if r['direction'] == 'egress' and r['ethertype'] == const.IPv4 ] self.assertEqual(len(rules), 1) v4_egress = rules[0] expected = {'direction': 'egress', 'ethertype': const.IPv4, 'remote_group_id': None, 'remote_ip_prefix': None, 'protocol': None, 'port_range_max': None, 'port_range_min': None} self._assert_sg_rule_has_kvs(v4_egress, expected) # Verify default rule for v6 egress rules = [ r for r in sg_rules if r['direction'] == 'egress' and r['ethertype'] == const.IPv6 ] self.assertEqual(len(rules), 1) v6_egress = rules[0] expected = {'direction': 'egress', 'ethertype': const.IPv6, 'remote_group_id': None, 'remote_ip_prefix': None, 'protocol': None, 'port_range_max': None, 'port_range_min': None} self._assert_sg_rule_has_kvs(v6_egress, expected) # Verify default rule for v4 ingress rules = [ r for r in sg_rules if r['direction'] == 'ingress' and r['ethertype'] == const.IPv4 ] self.assertEqual(len(rules), 1) v4_ingress = rules[0] expected = {'direction': 'ingress', 'ethertype': const.IPv4, 'remote_group_id': security_group_id, 'remote_ip_prefix': None, 'protocol': None, 'port_range_max': None, 'port_range_min': None} self._assert_sg_rule_has_kvs(v4_ingress, expected) # Verify default rule for v6 ingress rules = [ r for r in sg_rules if r['direction'] == 'ingress' and r['ethertype'] == const.IPv6 ] self.assertEqual(len(rules), 1) v6_ingress = rules[0] expected = {'direction': 'ingress', 'ethertype': const.IPv6, 'remote_group_id': security_group_id, 'remote_ip_prefix': None, 'protocol': None, 'port_range_max': None, 'port_range_min': None} self._assert_sg_rule_has_kvs(v6_ingress, expected) def test_create_security_group_rule_remote_ip_prefix(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: security_group_id = sg['security_group']['id'] direction = "ingress" remote_ip_prefix = "10.0.0.0/24" protocol = const.PROTO_NAME_TCP port_range_min = 22 port_range_max = 22 keys = [('remote_ip_prefix', remote_ip_prefix), ('security_group_id', security_group_id), ('direction', direction), ('protocol', protocol), ('port_range_min', port_range_min), ('port_range_max', port_range_max)] with self.security_group_rule(security_group_id, direction, protocol, port_range_min, port_range_max, remote_ip_prefix) as rule: for k, v, in keys: self.assertEqual(rule['security_group_rule'][k], v) def test_create_security_group_rule_group_id(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: with self.security_group(name, description) as sg2: security_group_id = sg['security_group']['id'] direction = "ingress" remote_group_id = sg2['security_group']['id'] protocol = const.PROTO_NAME_TCP port_range_min = 22 port_range_max = 22 keys = [('remote_group_id', remote_group_id), ('security_group_id', security_group_id), ('direction', direction), ('protocol', protocol), ('port_range_min', port_range_min), ('port_range_max', port_range_max)] with self.security_group_rule(security_group_id, direction, protocol, port_range_min, port_range_max, remote_group_id=remote_group_id ) as rule: for k, v, in keys: self.assertEqual(rule['security_group_rule'][k], v) def test_create_security_group_rule_icmp_with_type_and_code(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: security_group_id = sg['security_group']['id'] direction = "ingress" remote_ip_prefix = "10.0.0.0/24" protocol = const.PROTO_NAME_ICMP # port_range_min (ICMP type) is greater than port_range_max # (ICMP code) in order to confirm min <= max port check is # not called for ICMP. port_range_min = 8 port_range_max = 5 keys = [('remote_ip_prefix', remote_ip_prefix), ('security_group_id', security_group_id), ('direction', direction), ('protocol', protocol), ('port_range_min', port_range_min), ('port_range_max', port_range_max)] with self.security_group_rule(security_group_id, direction, protocol, port_range_min, port_range_max, remote_ip_prefix) as rule: for k, v, in keys: self.assertEqual(rule['security_group_rule'][k], v) def test_create_security_group_rule_icmp_with_type_only(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: security_group_id = sg['security_group']['id'] direction = "ingress" remote_ip_prefix = "10.0.0.0/24" protocol = const.PROTO_NAME_ICMP # ICMP type port_range_min = 8 # ICMP code port_range_max = None keys = [('remote_ip_prefix', remote_ip_prefix), ('security_group_id', security_group_id), ('direction', direction), ('protocol', protocol), ('port_range_min', port_range_min), ('port_range_max', port_range_max)] with self.security_group_rule(security_group_id, direction, protocol, port_range_min, port_range_max, remote_ip_prefix) as rule: for k, v, in keys: self.assertEqual(rule['security_group_rule'][k], v) def test_create_security_group_source_group_ip_and_ip_prefix(self): security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087" direction = "ingress" remote_ip_prefix = "10.0.0.0/24" protocol = const.PROTO_NAME_TCP port_range_min = 22 port_range_max = 22 remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087" rule = self._build_security_group_rule(security_group_id, direction, protocol, port_range_min, port_range_max, remote_ip_prefix, remote_group_id) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_bad_security_group_id(self): security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087" direction = "ingress" remote_ip_prefix = "10.0.0.0/24" protocol = const.PROTO_NAME_TCP port_range_min = 22 port_range_max = 22 rule = self._build_security_group_rule(security_group_id, direction, protocol, port_range_min, port_range_max, remote_ip_prefix) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code) def test_create_security_group_rule_bad_tenant(self): with self.security_group() as sg: rule = {'security_group_rule': {'security_group_id': sg['security_group']['id'], 'direction': 'ingress', 'protocol': const.PROTO_NAME_TCP, 'port_range_min': '22', 'port_range_max': '22', 'tenant_id': "bad_tenant"}} res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code) def test_create_security_group_rule_bad_tenant_remote_group_id(self): with self.security_group() as sg: res = self._create_security_group(self.fmt, 'webservers', 'webservers', tenant_id='bad_tenant') sg2 = self.deserialize(self.fmt, res) rule = {'security_group_rule': {'security_group_id': sg2['security_group']['id'], 'direction': 'ingress', 'protocol': const.PROTO_NAME_TCP, 'port_range_min': '22', 'port_range_max': '22', 'tenant_id': 'bad_tenant', 'remote_group_id': sg['security_group']['id']}} res = self._create_security_group_rule(self.fmt, rule, tenant_id='bad_tenant', set_context=True) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code) def test_create_security_group_rule_bad_tenant_security_group_rule(self): with self.security_group() as sg: res = self._create_security_group(self.fmt, 'webservers', 'webservers', tenant_id='bad_tenant') self.deserialize(self.fmt, res) rule = {'security_group_rule': {'security_group_id': sg['security_group']['id'], 'direction': 'ingress', 'protocol': const.PROTO_NAME_TCP, 'port_range_min': '22', 'port_range_max': '22', 'tenant_id': 'bad_tenant'}} res = self._create_security_group_rule(self.fmt, rule, tenant_id='bad_tenant', set_context=True) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code) def test_create_security_group_rule_bad_remote_group_id(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: security_group_id = sg['security_group']['id'] remote_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087" direction = "ingress" protocol = const.PROTO_NAME_TCP port_range_min = 22 port_range_max = 22 rule = self._build_security_group_rule(security_group_id, direction, protocol, port_range_min, port_range_max, remote_group_id=remote_group_id) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code) def test_create_security_group_rule_duplicate_rules(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: security_group_id = sg['security_group']['id'] with self.security_group_rule(security_group_id): rule = self._build_security_group_rule( sg['security_group']['id'], 'ingress', const.PROTO_NAME_TCP, '22', '22') self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPConflict.code) def test_create_security_group_rule_min_port_greater_max(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: security_group_id = sg['security_group']['id'] with self.security_group_rule(security_group_id): for protocol in [const.PROTO_NAME_TCP, const.PROTO_NAME_UDP, const.PROTO_NUM_TCP, const.PROTO_NUM_UDP]: rule = self._build_security_group_rule( sg['security_group']['id'], 'ingress', protocol, '50', '22') self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_ports_but_no_protocol(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: security_group_id = sg['security_group']['id'] with self.security_group_rule(security_group_id): rule = self._build_security_group_rule( sg['security_group']['id'], 'ingress', None, '22', '22') self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_port_range_min_only(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: security_group_id = sg['security_group']['id'] with self.security_group_rule(security_group_id): rule = self._build_security_group_rule( sg['security_group']['id'], 'ingress', const.PROTO_NAME_TCP, '22', None) self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_port_range_max_only(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: security_group_id = sg['security_group']['id'] with self.security_group_rule(security_group_id): rule = self._build_security_group_rule( sg['security_group']['id'], 'ingress', const.PROTO_NAME_TCP, None, '22') self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_icmp_type_too_big(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: security_group_id = sg['security_group']['id'] with self.security_group_rule(security_group_id): rule = self._build_security_group_rule( sg['security_group']['id'], 'ingress', const.PROTO_NAME_ICMP, '256', None) self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_icmp_code_too_big(self): name = 'webservers' description = 'my webservers' with self.security_group(name, description) as sg: security_group_id = sg['security_group']['id'] with self.security_group_rule(security_group_id): rule = self._build_security_group_rule( sg['security_group']['id'], 'ingress', const.PROTO_NAME_ICMP, '8', '256') self._create_security_group_rule(self.fmt, rule) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_list_ports_security_group(self): with self.network() as n: with self.subnet(n): self._create_port(self.fmt, n['network']['id']) req = self.new_list_request('ports') res = req.get_response(self.api) ports = self.deserialize(self.fmt, res) port = ports['ports'][0] self.assertEqual(len(port[ext_sg.SECURITYGROUPS]), 1) self._delete('ports', port['id']) def test_list_security_group_rules(self): with self.security_group(name='sg') as sg: security_group_id = sg['security_group']['id'] with contextlib.nested(self.security_group_rule(security_group_id, direction='egress', port_range_min=22, port_range_max=22), self.security_group_rule(security_group_id, direction='egress', port_range_min=23, port_range_max=23), self.security_group_rule(security_group_id, direction='egress', port_range_min=24, port_range_max=24) ) as (sgr1, sgr2, sgr3): # Delete default rules as they would fail the following # assertion at the end. self._delete_default_security_group_egress_rules( security_group_id) q = 'direction=egress&security_group_id=' + security_group_id self._test_list_resources('security-group-rule', [sgr1, sgr2, sgr3], query_params=q) def test_list_security_group_rules_with_sort(self): with self.security_group(name='sg') as sg: security_group_id = sg['security_group']['id'] with contextlib.nested(self.security_group_rule(security_group_id, direction='egress', port_range_min=22, port_range_max=22), self.security_group_rule(security_group_id, direction='egress', port_range_min=23, port_range_max=23), self.security_group_rule(security_group_id, direction='egress', port_range_min=24, port_range_max=24) ) as (sgr1, sgr2, sgr3): # Delete default rules as they would fail the following # assertion at the end. self._delete_default_security_group_egress_rules( security_group_id) q = 'direction=egress&security_group_id=' + security_group_id self._test_list_with_sort('security-group-rule', (sgr3, sgr2, sgr1), [('port_range_max', 'desc')], query_params=q) def test_list_security_group_rules_with_pagination(self): with self.security_group(name='sg') as sg: security_group_id = sg['security_group']['id'] with contextlib.nested(self.security_group_rule(security_group_id, direction='egress', port_range_min=22, port_range_max=22), self.security_group_rule(security_group_id, direction='egress', port_range_min=23, port_range_max=23), self.security_group_rule(security_group_id, direction='egress', port_range_min=24, port_range_max=24) ) as (sgr1, sgr2, sgr3): # Delete default rules as they would fail the following # assertion at the end. self._delete_default_security_group_egress_rules( security_group_id) q = 'direction=egress&security_group_id=' + security_group_id self._test_list_with_pagination( 'security-group-rule', (sgr3, sgr2, sgr1), ('port_range_max', 'desc'), 2, 2, query_params=q) def test_list_security_group_rules_with_pagination_reverse(self): with self.security_group(name='sg') as sg: security_group_id = sg['security_group']['id'] with contextlib.nested(self.security_group_rule(security_group_id, direction='egress', port_range_min=22, port_range_max=22), self.security_group_rule(security_group_id, direction='egress', port_range_min=23, port_range_max=23), self.security_group_rule(security_group_id, direction='egress', port_range_min=24, port_range_max=24) ) as (sgr1, sgr2, sgr3): self._test_list_with_pagination_reverse( 'security-group-rule', (sgr3, sgr2, sgr1), ('port_range_max', 'desc'), 2, 2, query_params='direction=egress') def test_update_port_with_security_group(self): with self.network() as n: with self.subnet(n): with self.security_group() as sg: res = self._create_port(self.fmt, n['network']['id']) port = self.deserialize(self.fmt, res) data = {'port': {'fixed_ips': port['port']['fixed_ips'], 'name': port['port']['name'], ext_sg.SECURITYGROUPS: [sg['security_group']['id']]}} req = self.new_update_request('ports', data, port['port']['id']) res = self.deserialize(self.fmt, req.get_response(self.api)) self.assertEqual(res['port'][ext_sg.SECURITYGROUPS][0], sg['security_group']['id']) # Test update port without security group data = {'port': {'fixed_ips': port['port']['fixed_ips'], 'name': port['port']['name']}} req = self.new_update_request('ports', data, port['port']['id']) res = self.deserialize(self.fmt, req.get_response(self.api)) self.assertEqual(res['port'][ext_sg.SECURITYGROUPS][0], sg['security_group']['id']) self._delete('ports', port['port']['id']) def test_update_port_with_multiple_security_groups(self): with self.network() as n: with self.subnet(n): with self.security_group() as sg1: with self.security_group() as sg2: res = self._create_port( self.fmt, n['network']['id'], security_groups=[sg1['security_group']['id'], sg2['security_group']['id']]) port = self.deserialize(self.fmt, res) self.assertEqual(len( port['port'][ext_sg.SECURITYGROUPS]), 2) self._delete('ports', port['port']['id']) def test_update_port_remove_security_group_empty_list(self): with self.network() as n: with self.subnet(n): with self.security_group() as sg: res = self._create_port(self.fmt, n['network']['id'], security_groups=( [sg['security_group']['id']])) port = self.deserialize(self.fmt, res) data = {'port': {'fixed_ips': port['port']['fixed_ips'], 'name': port['port']['name'], 'security_groups': []}} req = self.new_update_request('ports', data, port['port']['id']) res = self.deserialize(self.fmt, req.get_response(self.api)) self.assertEqual(res['port'].get(ext_sg.SECURITYGROUPS), []) self._delete('ports', port['port']['id']) def test_update_port_remove_security_group_none(self): with self.network() as n: with self.subnet(n): with self.security_group() as sg: res = self._create_port(self.fmt, n['network']['id'], security_groups=( [sg['security_group']['id']])) port = self.deserialize(self.fmt, res) data = {'port': {'fixed_ips': port['port']['fixed_ips'], 'name': port['port']['name'], 'security_groups': None}} req = self.new_update_request('ports', data, port['port']['id']) res = self.deserialize(self.fmt, req.get_response(self.api)) self.assertEqual(res['port'].get(ext_sg.SECURITYGROUPS), []) self._delete('ports', port['port']['id']) def test_create_port_with_bad_security_group(self): with self.network() as n: with self.subnet(n): res = self._create_port(self.fmt, n['network']['id'], security_groups=['bad_id']) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_delete_security_group_port_in_use(self): with self.network() as n: with self.subnet(n): with self.security_group() as sg: res = self._create_port(self.fmt, n['network']['id'], security_groups=( [sg['security_group']['id']])) port = self.deserialize(self.fmt, res) self.assertEqual(port['port'][ext_sg.SECURITYGROUPS][0], sg['security_group']['id']) # try to delete security group that's in use res = self._delete('security-groups', sg['security_group']['id'], webob.exc.HTTPConflict.code) # delete the blocking port self._delete('ports', port['port']['id']) def test_create_security_group_rule_bulk_native(self): if self._skip_native_bulk: self.skipTest("Plugin does not support native bulk " "security_group_rule create") with self.security_group() as sg: rule1 = self._build_security_group_rule(sg['security_group']['id'], 'ingress', const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24') rule2 = self._build_security_group_rule(sg['security_group']['id'], 'ingress', const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24') rules = {'security_group_rules': [rule1['security_group_rule'], rule2['security_group_rule']]} res = self._create_security_group_rule(self.fmt, rules) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) def test_create_security_group_rule_bulk_emulated(self): real_has_attr = hasattr #ensures the API choose the emulation code path def fakehasattr(item, attr): if attr.endswith('__native_bulk_support'): return False return real_has_attr(item, attr) with mock.patch('__builtin__.hasattr', new=fakehasattr): with self.security_group() as sg: rule1 = self._build_security_group_rule( sg['security_group']['id'], 'ingress', const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24') rule2 = self._build_security_group_rule( sg['security_group']['id'], 'ingress', const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24') rules = {'security_group_rules': [rule1['security_group_rule'], rule2['security_group_rule']] } res = self._create_security_group_rule(self.fmt, rules) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) def test_create_security_group_rule_allow_all_ipv4(self): with self.security_group() as sg: rule = {'security_group_id': sg['security_group']['id'], 'direction': 'ingress', 'ethertype': 'IPv4', 'tenant_id': 'test_tenant'} res = self._create_security_group_rule( self.fmt, {'security_group_rule': rule}) rule = self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) def test_create_security_group_rule_allow_all_ipv4_v6_bulk(self): if self._skip_native_bulk: self.skipTest("Plugin does not support native bulk " "security_group_rule create") with self.security_group() as sg: rule_v4 = {'security_group_id': sg['security_group']['id'], 'direction': 'ingress', 'ethertype': 'IPv4', 'tenant_id': 'test_tenant'} rule_v6 = {'security_group_id': sg['security_group']['id'], 'direction': 'ingress', 'ethertype': 'IPv6', 'tenant_id': 'test_tenant'} rules = {'security_group_rules': [rule_v4, rule_v6]} res = self._create_security_group_rule(self.fmt, rules) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPCreated.code) def test_create_security_group_rule_duplicate_rule_in_post(self): if self._skip_native_bulk: self.skipTest("Plugin does not support native bulk " "security_group_rule create") with self.security_group() as sg: rule = self._build_security_group_rule(sg['security_group']['id'], 'ingress', const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24') rules = {'security_group_rules': [rule['security_group_rule'], rule['security_group_rule']]} res = self._create_security_group_rule(self.fmt, rules) rule = self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPConflict.code) def test_create_security_group_rule_duplicate_rule_in_post_emulated(self): real_has_attr = hasattr #ensures the API choose the emulation code path def fakehasattr(item, attr): if attr.endswith('__native_bulk_support'): return False return real_has_attr(item, attr) with mock.patch('__builtin__.hasattr', new=fakehasattr): with self.security_group() as sg: rule = self._build_security_group_rule( sg['security_group']['id'], 'ingress', const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24') rules = {'security_group_rules': [rule['security_group_rule'], rule['security_group_rule']]} res = self._create_security_group_rule(self.fmt, rules) rule = self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPConflict.code) def test_create_security_group_rule_duplicate_rule_db(self): if self._skip_native_bulk: self.skipTest("Plugin does not support native bulk " "security_group_rule create") with self.security_group() as sg: rule = self._build_security_group_rule(sg['security_group']['id'], 'ingress', const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24') rules = {'security_group_rules': [rule]} self._create_security_group_rule(self.fmt, rules) res = self._create_security_group_rule(self.fmt, rules) rule = self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPConflict.code) def test_create_security_group_rule_duplicate_rule_db_emulated(self): real_has_attr = hasattr #ensures the API choose the emulation code path def fakehasattr(item, attr): if attr.endswith('__native_bulk_support'): return False return real_has_attr(item, attr) with mock.patch('__builtin__.hasattr', new=fakehasattr): with self.security_group() as sg: rule = self._build_security_group_rule( sg['security_group']['id'], 'ingress', const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24') rules = {'security_group_rules': [rule]} self._create_security_group_rule(self.fmt, rules) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPConflict.code) def test_create_security_group_rule_different_security_group_ids(self): if self._skip_native_bulk: self.skipTest("Plugin does not support native bulk " "security_group_rule create") with self.security_group() as sg1: with self.security_group() as sg2: rule1 = self._build_security_group_rule( sg1['security_group']['id'], 'ingress', const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24') rule2 = self._build_security_group_rule( sg2['security_group']['id'], 'ingress', const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24') rules = {'security_group_rules': [rule1['security_group_rule'], rule2['security_group_rule']] } res = self._create_security_group_rule(self.fmt, rules) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_with_invalid_ethertype(self): security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087" direction = "ingress" remote_ip_prefix = "10.0.0.0/24" protocol = const.PROTO_NAME_TCP port_range_min = 22 port_range_max = 22 remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087" rule = self._build_security_group_rule(security_group_id, direction, protocol, port_range_min, port_range_max, remote_ip_prefix, remote_group_id, ethertype='IPv5') res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_security_group_rule_with_invalid_protocol(self): security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087" direction = "ingress" remote_ip_prefix = "10.0.0.0/24" protocol = 'tcp/ip' port_range_min = 22 port_range_max = 22 remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087" rule = self._build_security_group_rule(security_group_id, direction, protocol, port_range_min, port_range_max, remote_ip_prefix, remote_group_id) res = self._create_security_group_rule(self.fmt, rule) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) def test_create_port_with_non_uuid(self): with self.network() as n: with self.subnet(n): res = self._create_port(self.fmt, n['network']['id'], security_groups=['not_valid']) self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) class TestSecurityGroupsXML(TestSecurityGroups): fmt = 'xml'