From 98eb4b338938b0f352a3301b2a1dd60f6613ac26 Mon Sep 17 00:00:00 2001 From: zhiyuan_cai Date: Thu, 3 Mar 2016 10:23:29 +0800 Subject: [PATCH] Implement security group - Neutron plugin part The task Neutron plugin needs to finish for security group functionality is simple. When a rule is created or deleted in a security group, it just updates the corresponding bottom security group. Also, creating or deleting a rule with "remote_group_id and changing rules in default security group will be rejected by plugin. Currently this task runs in synchronous way. Later it can be implemented in asynchronous way for better response time. Change-Id: Ibbf46c2e91382986c02324d86bc22887e93267eb --- tricircle/network/exceptions.py | 30 +++ tricircle/network/plugin.py | 6 +- tricircle/network/security_groups.py | 108 ++++++++ tricircle/tests/base.py | 10 +- tricircle/tests/unit/network/test_plugin.py | 156 ++++++++++- .../unit/network/test_security_groups.py | 243 ++++++++++++++++++ 6 files changed, 533 insertions(+), 20 deletions(-) create mode 100644 tricircle/network/exceptions.py create mode 100644 tricircle/network/security_groups.py create mode 100644 tricircle/tests/unit/network/test_security_groups.py diff --git a/tricircle/network/exceptions.py b/tricircle/network/exceptions.py new file mode 100644 index 0000000..d2b2812 --- /dev/null +++ b/tricircle/network/exceptions.py @@ -0,0 +1,30 @@ +# Copyright 2015 Huawei Technologies Co., Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.common import exceptions + +from tricircle.common.i18n import _ + + +class RemoteGroupNotSupported(exceptions.InvalidInput): + message = _('Remote group not supported by Tricircle plugin') + + +class DefaultGroupUpdateNotSupported(exceptions.InvalidInput): + message = _('Default group update not supported by Tricircle plugin') + + +class BottomPodOperationFailure(exceptions.NeutronException): + message = _('Operation for %(resource)s on bottom pod %(pod_name)s fails') diff --git a/tricircle/network/plugin.py b/tricircle/network/plugin.py index fa75f10..5264143 100644 --- a/tricircle/network/plugin.py +++ b/tricircle/network/plugin.py @@ -31,7 +31,6 @@ from neutron.db import l3_agentschedulers_db # noqa from neutron.db import l3_db from neutron.db import models_v2 from neutron.db import portbindings_db -from neutron.db import securitygroups_db from neutron.db import sqlalchemyutils from neutron.extensions import availability_zone as az_ext from neutron.extensions import external_net @@ -54,6 +53,7 @@ from tricircle.common import xrpcapi import tricircle.db.api as db_api from tricircle.db import core from tricircle.db import models +from tricircle.network import security_groups tricircle_opts = [ @@ -78,7 +78,7 @@ class TricircleVlanTypeDriver(type_vlan.VlanTypeDriver): class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2, - securitygroups_db.SecurityGroupDbMixin, + security_groups.TricircleSecurityGroupMixin, external_net_db.External_net_db_mixin, portbindings_db.PortBindingMixin, extradhcpopt_db.ExtraDhcpOptMixin, @@ -122,7 +122,7 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2, @log_helpers.log_method_call def start_rpc_listeners(self): - pass + return [] # NOTE(zhiyuan) use later # self.topic = topics.PLUGIN # self.conn = n_rpc.create_connection(new=True) diff --git a/tricircle/network/security_groups.py b/tricircle/network/security_groups.py new file mode 100644 index 0000000..65cde2f --- /dev/null +++ b/tricircle/network/security_groups.py @@ -0,0 +1,108 @@ +# Copyright 2015 Huawei Technologies Co., Ltd. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.db import securitygroups_db +import neutronclient.common.exceptions as q_exceptions + +from tricircle.common import constants +from tricircle.common import context +import tricircle.db.api as db_api +import tricircle.network.exceptions as n_exceptions + + +class TricircleSecurityGroupMixin(securitygroups_db.SecurityGroupDbMixin): + + @staticmethod + def _safe_create_security_group_rule(t_context, client, body): + try: + client.create_security_group_rules(t_context, body) + except q_exceptions.Conflict: + return + + @staticmethod + def _safe_delete_security_group_rule(t_context, client, _id): + try: + client.delete_security_group_rules(t_context, _id) + except q_exceptions.NotFound: + return + + @staticmethod + def _compare_rule(rule1, rule2): + for key in ('direction', 'remote_ip_prefix', 'protocol', 'ethertype', + 'port_range_max', 'port_range_min'): + if rule1[key] != rule2[key]: + return False + return True + + def create_security_group_rule(self, q_context, security_group_rule): + rule = security_group_rule['security_group_rule'] + if rule['remote_group_id']: + raise n_exceptions.RemoteGroupNotSupported() + sg_id = rule['security_group_id'] + sg = self.get_security_group(q_context, sg_id) + if sg['name'] == 'default': + raise n_exceptions.DefaultGroupUpdateNotSupported() + + new_rule = super(TricircleSecurityGroupMixin, + self).create_security_group_rule(q_context, + security_group_rule) + + t_context = context.get_context_from_neutron_context(q_context) + mappings = db_api.get_bottom_mappings_by_top_id( + t_context, sg_id, constants.RT_SG) + + try: + for pod, b_sg_id in mappings: + client = self._get_client(pod['pod_name']) + rule['security_group_id'] = b_sg_id + self._safe_create_security_group_rule( + t_context, client, {'security_group_rule': rule}) + except Exception: + super(TricircleSecurityGroupMixin, + self).delete_security_group_rule(q_context, new_rule['id']) + raise n_exceptions.BottomPodOperationFailure( + resource='security group rule', pod_name=pod['pod_name']) + return new_rule + + def delete_security_group_rule(self, q_context, _id): + rule = self.get_security_group_rule(q_context, _id) + if rule['remote_group_id']: + raise n_exceptions.RemoteGroupNotSupported() + sg_id = rule['security_group_id'] + sg = self.get_security_group(q_context, sg_id) + if sg['name'] == 'default': + raise n_exceptions.DefaultGroupUpdateNotSupported() + + t_context = context.get_context_from_neutron_context(q_context) + mappings = db_api.get_bottom_mappings_by_top_id( + t_context, sg_id, constants.RT_SG) + + try: + for pod, b_sg_id in mappings: + client = self._get_client(pod['pod_name']) + rule['security_group_id'] = b_sg_id + b_sg = client.get_security_groups(t_context, b_sg_id) + for b_rule in b_sg['security_group_rules']: + if not self._compare_rule(b_rule, rule): + continue + self._safe_delete_security_group_rule(t_context, client, + b_rule['id']) + break + except Exception: + raise n_exceptions.BottomPodOperationFailure( + resource='security group rule', pod_name=pod['pod_name']) + + super(TricircleSecurityGroupMixin, + self).delete_security_group_rule(q_context, _id) diff --git a/tricircle/tests/base.py b/tricircle/tests/base.py index ffdf974..f9d3bcd 100644 --- a/tricircle/tests/base.py +++ b/tricircle/tests/base.py @@ -21,8 +21,10 @@ from oslotest import base class TestCase(base.BaseTestCase): """Test case base class for all unit tests.""" def setUp(self): - # neutron has a configuration option "bind_port" which conflicts with - # tricircle configuration option, so unregister this option before - # running tricircle tests - cfg.CONF.unregister_opts(config.core_opts) + # neutron has configuration options "api_extensions_path" and + # "bind_port" which conflicts with tricircle configuration option, + # so unregister this option before running tricircle tests + for opt in config.core_opts: + if opt.name == 'api_extensions_path' or opt.name == 'bind_port': + cfg.CONF.unregister_opt(opt) super(TestCase, self).setUp() diff --git a/tricircle/tests/unit/network/test_plugin.py b/tricircle/tests/unit/network/test_plugin.py index 7e95785..6a6aae5 100644 --- a/tricircle/tests/unit/network/test_plugin.py +++ b/tricircle/tests/unit/network/test_plugin.py @@ -43,6 +43,7 @@ import tricircle.db.api as db_api from tricircle.db import core from tricircle.db import models from tricircle.network import plugin +from tricircle.tests.unit.network import test_security_groups TOP_NETS = [] @@ -57,19 +58,26 @@ TOP_VLANALLOCATIONS = [] TOP_SEGMENTS = [] TOP_EXTNETS = [] TOP_FLOATINGIPS = [] +TOP_SGS = [] +TOP_SG_RULES = [] BOTTOM1_NETS = [] BOTTOM1_SUBNETS = [] BOTTOM1_PORTS = [] BOTTOM1_ROUTERS = [] +BOTTOM1_SGS = [] BOTTOM2_NETS = [] BOTTOM2_SUBNETS = [] BOTTOM2_PORTS = [] BOTTOM2_ROUTERS = [] +BOTTOM2_SGS = [] RES_LIST = [TOP_NETS, TOP_SUBNETS, TOP_PORTS, TOP_ROUTERS, TOP_ROUTERPORT, TOP_SUBNETPOOLS, TOP_SUBNETPOOLPREFIXES, TOP_IPALLOCATIONS, TOP_VLANALLOCATIONS, TOP_SEGMENTS, TOP_EXTNETS, TOP_FLOATINGIPS, + TOP_SGS, TOP_SG_RULES, BOTTOM1_NETS, BOTTOM1_SUBNETS, BOTTOM1_PORTS, BOTTOM1_ROUTERS, - BOTTOM2_NETS, BOTTOM2_SUBNETS, BOTTOM2_PORTS, BOTTOM2_ROUTERS] + BOTTOM1_SGS, + BOTTOM2_NETS, BOTTOM2_SUBNETS, BOTTOM2_PORTS, BOTTOM2_ROUTERS, + BOTTOM2_SGS] RES_MAP = {'networks': TOP_NETS, 'subnets': TOP_SUBNETS, 'ports': TOP_PORTS, @@ -81,7 +89,9 @@ RES_MAP = {'networks': TOP_NETS, 'ml2_vlan_allocations': TOP_VLANALLOCATIONS, 'ml2_network_segments': TOP_SEGMENTS, 'externalnetworks': TOP_EXTNETS, - 'floatingips': TOP_FLOATINGIPS} + 'floatingips': TOP_FLOATINGIPS, + 'securitygroups': TOP_SGS, + 'securitygrouprules': TOP_SG_RULES} class DotDict(dict): @@ -96,14 +106,8 @@ class DotDict(dict): class FakeNeutronClient(object): - _res_map = {'pod_1': {'network': BOTTOM1_NETS, - 'subnet': BOTTOM1_SUBNETS, - 'port': BOTTOM1_PORTS, - 'router': BOTTOM1_ROUTERS}, - 'pod_2': {'network': BOTTOM2_NETS, - 'subnet': BOTTOM2_SUBNETS, - 'port': BOTTOM2_PORTS, - 'router': BOTTOM2_ROUTERS}} + _res_map = {'pod_1': {'port': BOTTOM1_PORTS}, + 'pod_2': {'port': BOTTOM2_PORTS}} def __init__(self, pod_name): self.pod_name = pod_name @@ -148,11 +152,13 @@ class FakeClient(object): _res_map = {'pod_1': {'network': BOTTOM1_NETS, 'subnet': BOTTOM1_SUBNETS, 'port': BOTTOM1_PORTS, - 'router': BOTTOM1_ROUTERS}, + 'router': BOTTOM1_ROUTERS, + 'security_group': BOTTOM1_SGS}, 'pod_2': {'network': BOTTOM2_NETS, 'subnet': BOTTOM2_SUBNETS, 'port': BOTTOM2_PORTS, - 'router': BOTTOM2_ROUTERS}} + 'router': BOTTOM2_ROUTERS, + 'security_group': BOTTOM2_SGS}} def __init__(self, pod_name): self.pod_name = pod_name @@ -244,6 +250,41 @@ class FakeClient(object): # only for mock purpose pass + def create_security_group_rules(self, ctx, body): + sg_id = body['security_group_rule']['security_group_id'] + res_list = self._res_map[self.pod_name]['security_group'] + for sg in res_list: + if sg['id'] == sg_id: + target_sg = sg + new_rule = copy.copy(body['security_group_rule']) + match_found = False + for rule in target_sg['security_group_rules']: + old_rule = copy.copy(rule) + if new_rule == old_rule: + match_found = True + break + if match_found: + raise q_exceptions.Conflict() + target_sg['security_group_rules'].append(body['security_group_rule']) + + def delete_security_group_rules(self, ctx, rule_id): + res_list = self._res_map[self.pod_name]['security_group'] + for sg in res_list: + for rule in sg['security_group_rules']: + if rule['id'] == rule_id: + sg['security_group_rules'].remove(rule) + return + + def get_security_groups(self, ctx, sg_id): + res_list = self._res_map[self.pod_name]['security_group'] + for sg in res_list: + if sg['id'] == sg_id: + # need to do a deep copy because we will traverse the security + # group's 'security_group_rules' field and make change to the + # group + ret_sg = copy.deepcopy(sg) + return ret_sg + class FakeNeutronContext(object): def __init__(self): @@ -470,6 +511,9 @@ class FakeSession(object): 'ports', 'id', 'fixed_ips') link_models(model_obj, model_dict, 'subnets', 'network_id', 'networks', 'id', 'subnets') + link_models(model_obj, model_dict, + 'securitygrouprules', 'security_group_id', + 'securitygroups', 'id', 'security_group_rules') if model_obj.__tablename__ == 'routerports': for port in TOP_PORTS: @@ -566,6 +610,9 @@ class FakePlugin(plugin.TricirclePlugin): return ret return port + def _make_security_group_dict(self, security_group, fields=None): + return security_group + def fake_get_context_from_neutron_context(q_context): return context.get_db_context() @@ -608,7 +655,8 @@ def _allocate_specific_ip(context, subnet_id, ip_address): pass -class PluginTest(unittest.TestCase): +class PluginTest(unittest.TestCase, + test_security_groups.TricircleSecurityGroupTestMixin): def setUp(self): core.initialize() core.ModelBase.metadata.create_all(core.get_engine()) @@ -1689,6 +1737,88 @@ class PluginTest(unittest.TestCase): self.assertIsNone(TOP_FLOATINGIPS[0]['fixed_ip_address']) self.assertIsNone(TOP_FLOATINGIPS[0]['router_id']) + @patch.object(context, 'get_context_from_neutron_context') + def test_create_security_group_rule(self, mock_context): + self._basic_pod_route_setup() + + fake_plugin = FakePlugin() + q_ctx = FakeNeutronContext() + t_ctx = context.get_db_context() + mock_context.return_value = t_ctx + + self._test_create_security_group_rule(fake_plugin, q_ctx, t_ctx, + 'pod_id_1', TOP_SGS, BOTTOM1_SGS) + + @patch.object(context, 'get_context_from_neutron_context') + def test_delete_security_group_rule(self, mock_context): + self._basic_pod_route_setup() + + fake_plugin = FakePlugin() + q_ctx = FakeNeutronContext() + t_ctx = context.get_db_context() + mock_context.return_value = t_ctx + + self._test_delete_security_group_rule(fake_plugin, q_ctx, t_ctx, + 'pod_id_1', TOP_SGS, + TOP_SG_RULES, BOTTOM1_SGS) + + @patch.object(context, 'get_context_from_neutron_context') + def test_handle_remote_group_invalid_input(self, mock_context): + self._basic_pod_route_setup() + + fake_plugin = FakePlugin() + q_ctx = FakeNeutronContext() + t_ctx = context.get_db_context() + mock_context.return_value = t_ctx + + self._test_handle_remote_group_invalid_input(fake_plugin, q_ctx, t_ctx, + 'pod_id_1', TOP_SGS, + TOP_SG_RULES, BOTTOM1_SGS) + + @patch.object(context, 'get_context_from_neutron_context') + def test_handle_default_sg_invalid_input(self, mock_context): + self._basic_pod_route_setup() + + fake_plugin = FakePlugin() + q_ctx = FakeNeutronContext() + t_ctx = context.get_db_context() + mock_context.return_value = t_ctx + + self._test_handle_default_sg_invalid_input(fake_plugin, q_ctx, t_ctx, + 'pod_id_1', TOP_SGS, + TOP_SG_RULES, BOTTOM1_SGS) + + @patch.object(FakeClient, 'create_security_group_rules') + @patch.object(context, 'get_context_from_neutron_context') + def test_create_security_group_rule_exception(self, mock_context, + mock_create): + self._basic_pod_route_setup() + + fake_plugin = FakePlugin() + q_ctx = FakeNeutronContext() + t_ctx = context.get_db_context() + mock_context.return_value = t_ctx + mock_create.side_effect = q_exceptions.ConnectionFailed + + self._test_create_security_group_rule_exception( + fake_plugin, q_ctx, t_ctx, 'pod_id_1', TOP_SGS, BOTTOM1_SGS) + + @patch.object(FakeClient, 'delete_security_group_rules') + @patch.object(context, 'get_context_from_neutron_context') + def test_delete_security_group_rule_exception(self, mock_context, + mock_delete): + self._basic_pod_route_setup() + + fake_plugin = FakePlugin() + q_ctx = FakeNeutronContext() + t_ctx = context.get_db_context() + mock_context.return_value = t_ctx + mock_delete.side_effect = q_exceptions.ConnectionFailed + + self._test_delete_security_group_rule_exception( + fake_plugin, q_ctx, t_ctx, 'pod_id_1', TOP_SGS, TOP_SG_RULES, + BOTTOM1_SGS) + def tearDown(self): core.ModelBase.metadata.drop_all(core.get_engine()) for res in RES_LIST: diff --git a/tricircle/tests/unit/network/test_security_groups.py b/tricircle/tests/unit/network/test_security_groups.py new file mode 100644 index 0000000..398a39c --- /dev/null +++ b/tricircle/tests/unit/network/test_security_groups.py @@ -0,0 +1,243 @@ +# Copyright 2015 Huawei Technologies Co., Ltd. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_utils import uuidutils + +from tricircle.common import constants +from tricircle.db import core +from tricircle.db import models +from tricircle.network import exceptions + + +class TricircleSecurityGroupTestMixin(object): + + @staticmethod + def _build_test_rule(_id, sg_id, project_id, ip_prefix, remote_group=None): + return {'security_group_id': sg_id, + 'id': _id, + 'tenant_id': project_id, + 'remote_group_id': remote_group, + 'direction': 'ingress', + 'remote_ip_prefix': ip_prefix, + 'protocol': None, + 'port_range_max': None, + 'port_range_min': None, + 'ethertype': 'IPv4'} + + def _test_create_security_group_rule(self, plugin, q_ctx, t_ctx, pod_id, + top_sgs, bottom1_sgs): + t_sg_id = uuidutils.generate_uuid() + t_rule_id = uuidutils.generate_uuid() + b_sg_id = uuidutils.generate_uuid() + project_id = 'test_prject_id' + t_sg = {'id': t_sg_id, 'name': 'test', 'description': '', + 'tenant_id': project_id, + 'security_group_rules': []} + b_sg = {'id': b_sg_id, 'name': t_sg_id, 'description': '', + 'tenant_id': project_id, + 'security_group_rules': []} + top_sgs.append(t_sg) + bottom1_sgs.append(b_sg) + route = { + 'top_id': t_sg_id, + 'pod_id': pod_id, + 'bottom_id': b_sg_id, + 'resource_type': constants.RT_SG} + with t_ctx.session.begin(): + core.create_resource(t_ctx, models.ResourceRouting, route) + + rule = { + 'security_group_rule': self._build_test_rule( + t_rule_id, t_sg_id, project_id, '10.0.0.0/24')} + plugin.create_security_group_rule(q_ctx, rule) + + self.assertEqual(1, len(bottom1_sgs[0]['security_group_rules'])) + b_rule = bottom1_sgs[0]['security_group_rules'][0] + self.assertEqual(b_sg_id, b_rule['security_group_id']) + rule['security_group_rule'].pop('security_group_id', None) + b_rule.pop('security_group_id', None) + self.assertEqual(rule['security_group_rule'], b_rule) + + def _test_delete_security_group_rule(self, plugin, q_ctx, t_ctx, pod_id, + top_sgs, top_rules, bottom1_sgs): + t_sg_id = uuidutils.generate_uuid() + t_rule1_id = uuidutils.generate_uuid() + t_rule2_id = uuidutils.generate_uuid() + b_sg_id = uuidutils.generate_uuid() + project_id = 'test_prject_id' + t_rule1 = self._build_test_rule( + t_rule1_id, t_sg_id, project_id, '10.0.1.0/24') + t_rule2 = self._build_test_rule( + t_rule2_id, t_sg_id, project_id, '10.0.2.0/24') + b_rule1 = self._build_test_rule( + t_rule1_id, b_sg_id, project_id, '10.0.1.0/24') + b_rule2 = self._build_test_rule( + t_rule2_id, b_sg_id, project_id, '10.0.2.0/24') + t_sg = {'id': t_sg_id, 'name': 'test', 'description': '', + 'tenant_id': project_id, + 'security_group_rules': [t_rule1, t_rule2]} + b_sg = {'id': b_sg_id, 'name': t_sg_id, 'description': '', + 'tenant_id': project_id, + 'security_group_rules': [b_rule1, b_rule2]} + top_sgs.append(t_sg) + top_rules.append(t_rule1) + top_rules.append(t_rule2) + bottom1_sgs.append(b_sg) + route = { + 'top_id': t_sg_id, + 'pod_id': pod_id, + 'bottom_id': b_sg_id, + 'resource_type': constants.RT_SG} + with t_ctx.session.begin(): + core.create_resource(t_ctx, models.ResourceRouting, route) + + plugin.delete_security_group_rule(q_ctx, t_rule1_id) + + self.assertEqual(1, len(bottom1_sgs[0]['security_group_rules'])) + b_rule = bottom1_sgs[0]['security_group_rules'][0] + self.assertEqual(b_sg_id, b_rule['security_group_id']) + t_rule2.pop('security_group_id', None) + b_rule.pop('security_group_id', None) + self.assertEqual(t_rule2, b_rule) + + def _test_handle_remote_group_invalid_input(self, plugin, q_ctx, t_ctx, + pod_id, top_sgs, top_rules, + bottom1_sgs): + t_sg1_id = uuidutils.generate_uuid() + t_sg2_id = uuidutils.generate_uuid() + t_rule1_id = uuidutils.generate_uuid() + t_rule2_id = uuidutils.generate_uuid() + b_sg_id = uuidutils.generate_uuid() + project_id = 'test_prject_id' + t_rule1 = self._build_test_rule( + t_rule1_id, t_sg1_id, project_id, None, t_sg1_id) + t_rule2 = self._build_test_rule( + t_rule2_id, t_sg1_id, project_id, None, t_sg2_id) + t_sg = {'id': t_sg1_id, 'name': 'test', 'description': '', + 'tenant_id': project_id, + 'security_group_rules': []} + b_sg = {'id': b_sg_id, 'name': t_sg1_id, 'description': '', + 'tenant_id': project_id, + 'security_group_rules': []} + top_sgs.append(t_sg) + top_rules.append(t_rule1) + bottom1_sgs.append(b_sg) + route = { + 'top_id': t_sg1_id, + 'pod_id': pod_id, + 'bottom_id': b_sg_id, + 'resource_type': constants.RT_SG} + with t_ctx.session.begin(): + core.create_resource(t_ctx, models.ResourceRouting, route) + + self.assertRaises(exceptions.RemoteGroupNotSupported, + plugin.create_security_group_rule, q_ctx, + {'security_group_rule': t_rule2}) + self.assertRaises(exceptions.RemoteGroupNotSupported, + plugin.delete_security_group_rule, q_ctx, t_rule1_id) + + def _test_handle_default_sg_invalid_input(self, plugin, q_ctx, t_ctx, + pod_id, top_sgs, top_rules, + bottom1_sgs): + t_sg_id = uuidutils.generate_uuid() + t_rule1_id = uuidutils.generate_uuid() + t_rule2_id = uuidutils.generate_uuid() + b_sg_id = uuidutils.generate_uuid() + project_id = 'test_prject_id' + t_rule1 = self._build_test_rule( + t_rule1_id, t_sg_id, project_id, '10.0.0.0/24') + t_rule2 = self._build_test_rule( + t_rule2_id, t_sg_id, project_id, '10.0.1.0/24') + t_sg = {'id': t_sg_id, 'name': 'default', 'description': '', + 'tenant_id': project_id, + 'security_group_rules': [t_rule1]} + b_sg = {'id': b_sg_id, 'name': t_sg_id, 'description': '', + 'tenant_id': project_id, + 'security_group_rules': []} + top_sgs.append(t_sg) + top_rules.append(t_rule1) + bottom1_sgs.append(b_sg) + route1 = { + 'top_id': t_sg_id, + 'pod_id': pod_id, + 'bottom_id': b_sg_id, + 'resource_type': constants.RT_SG} + with t_ctx.session.begin(): + core.create_resource(t_ctx, models.ResourceRouting, route1) + + self.assertRaises(exceptions.DefaultGroupUpdateNotSupported, + plugin.create_security_group_rule, q_ctx, + {'security_group_rule': t_rule2}) + self.assertRaises(exceptions.DefaultGroupUpdateNotSupported, + plugin.delete_security_group_rule, q_ctx, t_rule1_id) + + def _test_create_security_group_rule_exception( + self, plugin, q_ctx, t_ctx, pod_id, top_sgs, bottom1_sgs): + t_sg_id = uuidutils.generate_uuid() + t_rule_id = uuidutils.generate_uuid() + b_sg_id = uuidutils.generate_uuid() + project_id = 'test_prject_id' + t_sg = {'id': t_sg_id, 'name': 'test', 'description': '', + 'tenant_id': project_id, + 'security_group_rules': []} + b_sg = {'id': b_sg_id, 'name': t_sg_id, 'description': '', + 'tenant_id': project_id, + 'security_group_rules': []} + top_sgs.append(t_sg) + bottom1_sgs.append(b_sg) + route = { + 'top_id': t_sg_id, + 'pod_id': pod_id, + 'bottom_id': b_sg_id, + 'resource_type': constants.RT_SG} + with t_ctx.session.begin(): + core.create_resource(t_ctx, models.ResourceRouting, route) + + rule = { + 'security_group_rule': self._build_test_rule( + t_rule_id, t_sg_id, project_id, '10.0.0.0/24')} + self.assertRaises(exceptions.BottomPodOperationFailure, + plugin.create_security_group_rule, q_ctx, rule) + + def _test_delete_security_group_rule_exception(self, plugin, q_ctx, t_ctx, + pod_id, top_sgs, top_rules, + bottom1_sgs): + t_sg_id = uuidutils.generate_uuid() + t_rule_id = uuidutils.generate_uuid() + b_sg_id = uuidutils.generate_uuid() + project_id = 'test_prject_id' + t_rule = self._build_test_rule( + t_rule_id, t_sg_id, project_id, '10.0.1.0/24') + b_rule = self._build_test_rule( + t_rule_id, b_sg_id, project_id, '10.0.1.0/24') + t_sg = {'id': t_sg_id, 'name': 'test', 'description': '', + 'tenant_id': project_id, + 'security_group_rules': [t_rule]} + b_sg = {'id': b_sg_id, 'name': t_sg_id, 'description': '', + 'tenant_id': project_id, + 'security_group_rules': [b_rule]} + top_sgs.append(t_sg) + top_rules.append(t_rule) + bottom1_sgs.append(b_sg) + route = { + 'top_id': t_sg_id, + 'pod_id': pod_id, + 'bottom_id': b_sg_id, + 'resource_type': constants.RT_SG} + with t_ctx.session.begin(): + core.create_resource(t_ctx, models.ResourceRouting, route) + + self.assertRaises(exceptions.BottomPodOperationFailure, + plugin.delete_security_group_rule, q_ctx, t_rule_id)