Implement security group - Neutron plugin part

The task Neutron plugin needs to finish for security group
functionality is simple. When a rule is created or deleted
in a security group, it just updates the corresponding
bottom security group. Also, creating or deleting a rule
with "remote_group_id and changing rules in default
security group will be rejected by plugin.

Currently this task runs in synchronous way. Later it can
be implemented in asynchronous way for better response time.

Change-Id: Ibbf46c2e91382986c02324d86bc22887e93267eb
This commit is contained in:
zhiyuan_cai 2016-03-03 10:23:29 +08:00
parent 8c0e8276c3
commit 98eb4b3389
6 changed files with 533 additions and 20 deletions

View File

@ -0,0 +1,30 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import exceptions
from tricircle.common.i18n import _
class RemoteGroupNotSupported(exceptions.InvalidInput):
message = _('Remote group not supported by Tricircle plugin')
class DefaultGroupUpdateNotSupported(exceptions.InvalidInput):
message = _('Default group update not supported by Tricircle plugin')
class BottomPodOperationFailure(exceptions.NeutronException):
message = _('Operation for %(resource)s on bottom pod %(pod_name)s fails')

View File

@ -31,7 +31,6 @@ from neutron.db import l3_agentschedulers_db # noqa
from neutron.db import l3_db from neutron.db import l3_db
from neutron.db import models_v2 from neutron.db import models_v2
from neutron.db import portbindings_db from neutron.db import portbindings_db
from neutron.db import securitygroups_db
from neutron.db import sqlalchemyutils from neutron.db import sqlalchemyutils
from neutron.extensions import availability_zone as az_ext from neutron.extensions import availability_zone as az_ext
from neutron.extensions import external_net from neutron.extensions import external_net
@ -54,6 +53,7 @@ from tricircle.common import xrpcapi
import tricircle.db.api as db_api import tricircle.db.api as db_api
from tricircle.db import core from tricircle.db import core
from tricircle.db import models from tricircle.db import models
from tricircle.network import security_groups
tricircle_opts = [ tricircle_opts = [
@ -78,7 +78,7 @@ class TricircleVlanTypeDriver(type_vlan.VlanTypeDriver):
class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2, class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
securitygroups_db.SecurityGroupDbMixin, security_groups.TricircleSecurityGroupMixin,
external_net_db.External_net_db_mixin, external_net_db.External_net_db_mixin,
portbindings_db.PortBindingMixin, portbindings_db.PortBindingMixin,
extradhcpopt_db.ExtraDhcpOptMixin, extradhcpopt_db.ExtraDhcpOptMixin,
@ -122,7 +122,7 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
@log_helpers.log_method_call @log_helpers.log_method_call
def start_rpc_listeners(self): def start_rpc_listeners(self):
pass return []
# NOTE(zhiyuan) use later # NOTE(zhiyuan) use later
# self.topic = topics.PLUGIN # self.topic = topics.PLUGIN
# self.conn = n_rpc.create_connection(new=True) # self.conn = n_rpc.create_connection(new=True)

View File

@ -0,0 +1,108 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.db import securitygroups_db
import neutronclient.common.exceptions as q_exceptions
from tricircle.common import constants
from tricircle.common import context
import tricircle.db.api as db_api
import tricircle.network.exceptions as n_exceptions
class TricircleSecurityGroupMixin(securitygroups_db.SecurityGroupDbMixin):
@staticmethod
def _safe_create_security_group_rule(t_context, client, body):
try:
client.create_security_group_rules(t_context, body)
except q_exceptions.Conflict:
return
@staticmethod
def _safe_delete_security_group_rule(t_context, client, _id):
try:
client.delete_security_group_rules(t_context, _id)
except q_exceptions.NotFound:
return
@staticmethod
def _compare_rule(rule1, rule2):
for key in ('direction', 'remote_ip_prefix', 'protocol', 'ethertype',
'port_range_max', 'port_range_min'):
if rule1[key] != rule2[key]:
return False
return True
def create_security_group_rule(self, q_context, security_group_rule):
rule = security_group_rule['security_group_rule']
if rule['remote_group_id']:
raise n_exceptions.RemoteGroupNotSupported()
sg_id = rule['security_group_id']
sg = self.get_security_group(q_context, sg_id)
if sg['name'] == 'default':
raise n_exceptions.DefaultGroupUpdateNotSupported()
new_rule = super(TricircleSecurityGroupMixin,
self).create_security_group_rule(q_context,
security_group_rule)
t_context = context.get_context_from_neutron_context(q_context)
mappings = db_api.get_bottom_mappings_by_top_id(
t_context, sg_id, constants.RT_SG)
try:
for pod, b_sg_id in mappings:
client = self._get_client(pod['pod_name'])
rule['security_group_id'] = b_sg_id
self._safe_create_security_group_rule(
t_context, client, {'security_group_rule': rule})
except Exception:
super(TricircleSecurityGroupMixin,
self).delete_security_group_rule(q_context, new_rule['id'])
raise n_exceptions.BottomPodOperationFailure(
resource='security group rule', pod_name=pod['pod_name'])
return new_rule
def delete_security_group_rule(self, q_context, _id):
rule = self.get_security_group_rule(q_context, _id)
if rule['remote_group_id']:
raise n_exceptions.RemoteGroupNotSupported()
sg_id = rule['security_group_id']
sg = self.get_security_group(q_context, sg_id)
if sg['name'] == 'default':
raise n_exceptions.DefaultGroupUpdateNotSupported()
t_context = context.get_context_from_neutron_context(q_context)
mappings = db_api.get_bottom_mappings_by_top_id(
t_context, sg_id, constants.RT_SG)
try:
for pod, b_sg_id in mappings:
client = self._get_client(pod['pod_name'])
rule['security_group_id'] = b_sg_id
b_sg = client.get_security_groups(t_context, b_sg_id)
for b_rule in b_sg['security_group_rules']:
if not self._compare_rule(b_rule, rule):
continue
self._safe_delete_security_group_rule(t_context, client,
b_rule['id'])
break
except Exception:
raise n_exceptions.BottomPodOperationFailure(
resource='security group rule', pod_name=pod['pod_name'])
super(TricircleSecurityGroupMixin,
self).delete_security_group_rule(q_context, _id)

View File

@ -21,8 +21,10 @@ from oslotest import base
class TestCase(base.BaseTestCase): class TestCase(base.BaseTestCase):
"""Test case base class for all unit tests.""" """Test case base class for all unit tests."""
def setUp(self): def setUp(self):
# neutron has a configuration option "bind_port" which conflicts with # neutron has configuration options "api_extensions_path" and
# tricircle configuration option, so unregister this option before # "bind_port" which conflicts with tricircle configuration option,
# running tricircle tests # so unregister this option before running tricircle tests
cfg.CONF.unregister_opts(config.core_opts) for opt in config.core_opts:
if opt.name == 'api_extensions_path' or opt.name == 'bind_port':
cfg.CONF.unregister_opt(opt)
super(TestCase, self).setUp() super(TestCase, self).setUp()

View File

@ -43,6 +43,7 @@ import tricircle.db.api as db_api
from tricircle.db import core from tricircle.db import core
from tricircle.db import models from tricircle.db import models
from tricircle.network import plugin from tricircle.network import plugin
from tricircle.tests.unit.network import test_security_groups
TOP_NETS = [] TOP_NETS = []
@ -57,19 +58,26 @@ TOP_VLANALLOCATIONS = []
TOP_SEGMENTS = [] TOP_SEGMENTS = []
TOP_EXTNETS = [] TOP_EXTNETS = []
TOP_FLOATINGIPS = [] TOP_FLOATINGIPS = []
TOP_SGS = []
TOP_SG_RULES = []
BOTTOM1_NETS = [] BOTTOM1_NETS = []
BOTTOM1_SUBNETS = [] BOTTOM1_SUBNETS = []
BOTTOM1_PORTS = [] BOTTOM1_PORTS = []
BOTTOM1_ROUTERS = [] BOTTOM1_ROUTERS = []
BOTTOM1_SGS = []
BOTTOM2_NETS = [] BOTTOM2_NETS = []
BOTTOM2_SUBNETS = [] BOTTOM2_SUBNETS = []
BOTTOM2_PORTS = [] BOTTOM2_PORTS = []
BOTTOM2_ROUTERS = [] BOTTOM2_ROUTERS = []
BOTTOM2_SGS = []
RES_LIST = [TOP_NETS, TOP_SUBNETS, TOP_PORTS, TOP_ROUTERS, TOP_ROUTERPORT, RES_LIST = [TOP_NETS, TOP_SUBNETS, TOP_PORTS, TOP_ROUTERS, TOP_ROUTERPORT,
TOP_SUBNETPOOLS, TOP_SUBNETPOOLPREFIXES, TOP_IPALLOCATIONS, TOP_SUBNETPOOLS, TOP_SUBNETPOOLPREFIXES, TOP_IPALLOCATIONS,
TOP_VLANALLOCATIONS, TOP_SEGMENTS, TOP_EXTNETS, TOP_FLOATINGIPS, TOP_VLANALLOCATIONS, TOP_SEGMENTS, TOP_EXTNETS, TOP_FLOATINGIPS,
TOP_SGS, TOP_SG_RULES,
BOTTOM1_NETS, BOTTOM1_SUBNETS, BOTTOM1_PORTS, BOTTOM1_ROUTERS, BOTTOM1_NETS, BOTTOM1_SUBNETS, BOTTOM1_PORTS, BOTTOM1_ROUTERS,
BOTTOM2_NETS, BOTTOM2_SUBNETS, BOTTOM2_PORTS, BOTTOM2_ROUTERS] BOTTOM1_SGS,
BOTTOM2_NETS, BOTTOM2_SUBNETS, BOTTOM2_PORTS, BOTTOM2_ROUTERS,
BOTTOM2_SGS]
RES_MAP = {'networks': TOP_NETS, RES_MAP = {'networks': TOP_NETS,
'subnets': TOP_SUBNETS, 'subnets': TOP_SUBNETS,
'ports': TOP_PORTS, 'ports': TOP_PORTS,
@ -81,7 +89,9 @@ RES_MAP = {'networks': TOP_NETS,
'ml2_vlan_allocations': TOP_VLANALLOCATIONS, 'ml2_vlan_allocations': TOP_VLANALLOCATIONS,
'ml2_network_segments': TOP_SEGMENTS, 'ml2_network_segments': TOP_SEGMENTS,
'externalnetworks': TOP_EXTNETS, 'externalnetworks': TOP_EXTNETS,
'floatingips': TOP_FLOATINGIPS} 'floatingips': TOP_FLOATINGIPS,
'securitygroups': TOP_SGS,
'securitygrouprules': TOP_SG_RULES}
class DotDict(dict): class DotDict(dict):
@ -96,14 +106,8 @@ class DotDict(dict):
class FakeNeutronClient(object): class FakeNeutronClient(object):
_res_map = {'pod_1': {'network': BOTTOM1_NETS, _res_map = {'pod_1': {'port': BOTTOM1_PORTS},
'subnet': BOTTOM1_SUBNETS, 'pod_2': {'port': BOTTOM2_PORTS}}
'port': BOTTOM1_PORTS,
'router': BOTTOM1_ROUTERS},
'pod_2': {'network': BOTTOM2_NETS,
'subnet': BOTTOM2_SUBNETS,
'port': BOTTOM2_PORTS,
'router': BOTTOM2_ROUTERS}}
def __init__(self, pod_name): def __init__(self, pod_name):
self.pod_name = pod_name self.pod_name = pod_name
@ -148,11 +152,13 @@ class FakeClient(object):
_res_map = {'pod_1': {'network': BOTTOM1_NETS, _res_map = {'pod_1': {'network': BOTTOM1_NETS,
'subnet': BOTTOM1_SUBNETS, 'subnet': BOTTOM1_SUBNETS,
'port': BOTTOM1_PORTS, 'port': BOTTOM1_PORTS,
'router': BOTTOM1_ROUTERS}, 'router': BOTTOM1_ROUTERS,
'security_group': BOTTOM1_SGS},
'pod_2': {'network': BOTTOM2_NETS, 'pod_2': {'network': BOTTOM2_NETS,
'subnet': BOTTOM2_SUBNETS, 'subnet': BOTTOM2_SUBNETS,
'port': BOTTOM2_PORTS, 'port': BOTTOM2_PORTS,
'router': BOTTOM2_ROUTERS}} 'router': BOTTOM2_ROUTERS,
'security_group': BOTTOM2_SGS}}
def __init__(self, pod_name): def __init__(self, pod_name):
self.pod_name = pod_name self.pod_name = pod_name
@ -244,6 +250,41 @@ class FakeClient(object):
# only for mock purpose # only for mock purpose
pass pass
def create_security_group_rules(self, ctx, body):
sg_id = body['security_group_rule']['security_group_id']
res_list = self._res_map[self.pod_name]['security_group']
for sg in res_list:
if sg['id'] == sg_id:
target_sg = sg
new_rule = copy.copy(body['security_group_rule'])
match_found = False
for rule in target_sg['security_group_rules']:
old_rule = copy.copy(rule)
if new_rule == old_rule:
match_found = True
break
if match_found:
raise q_exceptions.Conflict()
target_sg['security_group_rules'].append(body['security_group_rule'])
def delete_security_group_rules(self, ctx, rule_id):
res_list = self._res_map[self.pod_name]['security_group']
for sg in res_list:
for rule in sg['security_group_rules']:
if rule['id'] == rule_id:
sg['security_group_rules'].remove(rule)
return
def get_security_groups(self, ctx, sg_id):
res_list = self._res_map[self.pod_name]['security_group']
for sg in res_list:
if sg['id'] == sg_id:
# need to do a deep copy because we will traverse the security
# group's 'security_group_rules' field and make change to the
# group
ret_sg = copy.deepcopy(sg)
return ret_sg
class FakeNeutronContext(object): class FakeNeutronContext(object):
def __init__(self): def __init__(self):
@ -470,6 +511,9 @@ class FakeSession(object):
'ports', 'id', 'fixed_ips') 'ports', 'id', 'fixed_ips')
link_models(model_obj, model_dict, link_models(model_obj, model_dict,
'subnets', 'network_id', 'networks', 'id', 'subnets') 'subnets', 'network_id', 'networks', 'id', 'subnets')
link_models(model_obj, model_dict,
'securitygrouprules', 'security_group_id',
'securitygroups', 'id', 'security_group_rules')
if model_obj.__tablename__ == 'routerports': if model_obj.__tablename__ == 'routerports':
for port in TOP_PORTS: for port in TOP_PORTS:
@ -566,6 +610,9 @@ class FakePlugin(plugin.TricirclePlugin):
return ret return ret
return port return port
def _make_security_group_dict(self, security_group, fields=None):
return security_group
def fake_get_context_from_neutron_context(q_context): def fake_get_context_from_neutron_context(q_context):
return context.get_db_context() return context.get_db_context()
@ -608,7 +655,8 @@ def _allocate_specific_ip(context, subnet_id, ip_address):
pass pass
class PluginTest(unittest.TestCase): class PluginTest(unittest.TestCase,
test_security_groups.TricircleSecurityGroupTestMixin):
def setUp(self): def setUp(self):
core.initialize() core.initialize()
core.ModelBase.metadata.create_all(core.get_engine()) core.ModelBase.metadata.create_all(core.get_engine())
@ -1689,6 +1737,88 @@ class PluginTest(unittest.TestCase):
self.assertIsNone(TOP_FLOATINGIPS[0]['fixed_ip_address']) self.assertIsNone(TOP_FLOATINGIPS[0]['fixed_ip_address'])
self.assertIsNone(TOP_FLOATINGIPS[0]['router_id']) self.assertIsNone(TOP_FLOATINGIPS[0]['router_id'])
@patch.object(context, 'get_context_from_neutron_context')
def test_create_security_group_rule(self, mock_context):
self._basic_pod_route_setup()
fake_plugin = FakePlugin()
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
mock_context.return_value = t_ctx
self._test_create_security_group_rule(fake_plugin, q_ctx, t_ctx,
'pod_id_1', TOP_SGS, BOTTOM1_SGS)
@patch.object(context, 'get_context_from_neutron_context')
def test_delete_security_group_rule(self, mock_context):
self._basic_pod_route_setup()
fake_plugin = FakePlugin()
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
mock_context.return_value = t_ctx
self._test_delete_security_group_rule(fake_plugin, q_ctx, t_ctx,
'pod_id_1', TOP_SGS,
TOP_SG_RULES, BOTTOM1_SGS)
@patch.object(context, 'get_context_from_neutron_context')
def test_handle_remote_group_invalid_input(self, mock_context):
self._basic_pod_route_setup()
fake_plugin = FakePlugin()
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
mock_context.return_value = t_ctx
self._test_handle_remote_group_invalid_input(fake_plugin, q_ctx, t_ctx,
'pod_id_1', TOP_SGS,
TOP_SG_RULES, BOTTOM1_SGS)
@patch.object(context, 'get_context_from_neutron_context')
def test_handle_default_sg_invalid_input(self, mock_context):
self._basic_pod_route_setup()
fake_plugin = FakePlugin()
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
mock_context.return_value = t_ctx
self._test_handle_default_sg_invalid_input(fake_plugin, q_ctx, t_ctx,
'pod_id_1', TOP_SGS,
TOP_SG_RULES, BOTTOM1_SGS)
@patch.object(FakeClient, 'create_security_group_rules')
@patch.object(context, 'get_context_from_neutron_context')
def test_create_security_group_rule_exception(self, mock_context,
mock_create):
self._basic_pod_route_setup()
fake_plugin = FakePlugin()
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
mock_context.return_value = t_ctx
mock_create.side_effect = q_exceptions.ConnectionFailed
self._test_create_security_group_rule_exception(
fake_plugin, q_ctx, t_ctx, 'pod_id_1', TOP_SGS, BOTTOM1_SGS)
@patch.object(FakeClient, 'delete_security_group_rules')
@patch.object(context, 'get_context_from_neutron_context')
def test_delete_security_group_rule_exception(self, mock_context,
mock_delete):
self._basic_pod_route_setup()
fake_plugin = FakePlugin()
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
mock_context.return_value = t_ctx
mock_delete.side_effect = q_exceptions.ConnectionFailed
self._test_delete_security_group_rule_exception(
fake_plugin, q_ctx, t_ctx, 'pod_id_1', TOP_SGS, TOP_SG_RULES,
BOTTOM1_SGS)
def tearDown(self): def tearDown(self):
core.ModelBase.metadata.drop_all(core.get_engine()) core.ModelBase.metadata.drop_all(core.get_engine())
for res in RES_LIST: for res in RES_LIST:

View File

@ -0,0 +1,243 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import uuidutils
from tricircle.common import constants
from tricircle.db import core
from tricircle.db import models
from tricircle.network import exceptions
class TricircleSecurityGroupTestMixin(object):
@staticmethod
def _build_test_rule(_id, sg_id, project_id, ip_prefix, remote_group=None):
return {'security_group_id': sg_id,
'id': _id,
'tenant_id': project_id,
'remote_group_id': remote_group,
'direction': 'ingress',
'remote_ip_prefix': ip_prefix,
'protocol': None,
'port_range_max': None,
'port_range_min': None,
'ethertype': 'IPv4'}
def _test_create_security_group_rule(self, plugin, q_ctx, t_ctx, pod_id,
top_sgs, bottom1_sgs):
t_sg_id = uuidutils.generate_uuid()
t_rule_id = uuidutils.generate_uuid()
b_sg_id = uuidutils.generate_uuid()
project_id = 'test_prject_id'
t_sg = {'id': t_sg_id, 'name': 'test', 'description': '',
'tenant_id': project_id,
'security_group_rules': []}
b_sg = {'id': b_sg_id, 'name': t_sg_id, 'description': '',
'tenant_id': project_id,
'security_group_rules': []}
top_sgs.append(t_sg)
bottom1_sgs.append(b_sg)
route = {
'top_id': t_sg_id,
'pod_id': pod_id,
'bottom_id': b_sg_id,
'resource_type': constants.RT_SG}
with t_ctx.session.begin():
core.create_resource(t_ctx, models.ResourceRouting, route)
rule = {
'security_group_rule': self._build_test_rule(
t_rule_id, t_sg_id, project_id, '10.0.0.0/24')}
plugin.create_security_group_rule(q_ctx, rule)
self.assertEqual(1, len(bottom1_sgs[0]['security_group_rules']))
b_rule = bottom1_sgs[0]['security_group_rules'][0]
self.assertEqual(b_sg_id, b_rule['security_group_id'])
rule['security_group_rule'].pop('security_group_id', None)
b_rule.pop('security_group_id', None)
self.assertEqual(rule['security_group_rule'], b_rule)
def _test_delete_security_group_rule(self, plugin, q_ctx, t_ctx, pod_id,
top_sgs, top_rules, bottom1_sgs):
t_sg_id = uuidutils.generate_uuid()
t_rule1_id = uuidutils.generate_uuid()
t_rule2_id = uuidutils.generate_uuid()
b_sg_id = uuidutils.generate_uuid()
project_id = 'test_prject_id'
t_rule1 = self._build_test_rule(
t_rule1_id, t_sg_id, project_id, '10.0.1.0/24')
t_rule2 = self._build_test_rule(
t_rule2_id, t_sg_id, project_id, '10.0.2.0/24')
b_rule1 = self._build_test_rule(
t_rule1_id, b_sg_id, project_id, '10.0.1.0/24')
b_rule2 = self._build_test_rule(
t_rule2_id, b_sg_id, project_id, '10.0.2.0/24')
t_sg = {'id': t_sg_id, 'name': 'test', 'description': '',
'tenant_id': project_id,
'security_group_rules': [t_rule1, t_rule2]}
b_sg = {'id': b_sg_id, 'name': t_sg_id, 'description': '',
'tenant_id': project_id,
'security_group_rules': [b_rule1, b_rule2]}
top_sgs.append(t_sg)
top_rules.append(t_rule1)
top_rules.append(t_rule2)
bottom1_sgs.append(b_sg)
route = {
'top_id': t_sg_id,
'pod_id': pod_id,
'bottom_id': b_sg_id,
'resource_type': constants.RT_SG}
with t_ctx.session.begin():
core.create_resource(t_ctx, models.ResourceRouting, route)
plugin.delete_security_group_rule(q_ctx, t_rule1_id)
self.assertEqual(1, len(bottom1_sgs[0]['security_group_rules']))
b_rule = bottom1_sgs[0]['security_group_rules'][0]
self.assertEqual(b_sg_id, b_rule['security_group_id'])
t_rule2.pop('security_group_id', None)
b_rule.pop('security_group_id', None)
self.assertEqual(t_rule2, b_rule)
def _test_handle_remote_group_invalid_input(self, plugin, q_ctx, t_ctx,
pod_id, top_sgs, top_rules,
bottom1_sgs):
t_sg1_id = uuidutils.generate_uuid()
t_sg2_id = uuidutils.generate_uuid()
t_rule1_id = uuidutils.generate_uuid()
t_rule2_id = uuidutils.generate_uuid()
b_sg_id = uuidutils.generate_uuid()
project_id = 'test_prject_id'
t_rule1 = self._build_test_rule(
t_rule1_id, t_sg1_id, project_id, None, t_sg1_id)
t_rule2 = self._build_test_rule(
t_rule2_id, t_sg1_id, project_id, None, t_sg2_id)
t_sg = {'id': t_sg1_id, 'name': 'test', 'description': '',
'tenant_id': project_id,
'security_group_rules': []}
b_sg = {'id': b_sg_id, 'name': t_sg1_id, 'description': '',
'tenant_id': project_id,
'security_group_rules': []}
top_sgs.append(t_sg)
top_rules.append(t_rule1)
bottom1_sgs.append(b_sg)
route = {
'top_id': t_sg1_id,
'pod_id': pod_id,
'bottom_id': b_sg_id,
'resource_type': constants.RT_SG}
with t_ctx.session.begin():
core.create_resource(t_ctx, models.ResourceRouting, route)
self.assertRaises(exceptions.RemoteGroupNotSupported,
plugin.create_security_group_rule, q_ctx,
{'security_group_rule': t_rule2})
self.assertRaises(exceptions.RemoteGroupNotSupported,
plugin.delete_security_group_rule, q_ctx, t_rule1_id)
def _test_handle_default_sg_invalid_input(self, plugin, q_ctx, t_ctx,
pod_id, top_sgs, top_rules,
bottom1_sgs):
t_sg_id = uuidutils.generate_uuid()
t_rule1_id = uuidutils.generate_uuid()
t_rule2_id = uuidutils.generate_uuid()
b_sg_id = uuidutils.generate_uuid()
project_id = 'test_prject_id'
t_rule1 = self._build_test_rule(
t_rule1_id, t_sg_id, project_id, '10.0.0.0/24')
t_rule2 = self._build_test_rule(
t_rule2_id, t_sg_id, project_id, '10.0.1.0/24')
t_sg = {'id': t_sg_id, 'name': 'default', 'description': '',
'tenant_id': project_id,
'security_group_rules': [t_rule1]}
b_sg = {'id': b_sg_id, 'name': t_sg_id, 'description': '',
'tenant_id': project_id,
'security_group_rules': []}
top_sgs.append(t_sg)
top_rules.append(t_rule1)
bottom1_sgs.append(b_sg)
route1 = {
'top_id': t_sg_id,
'pod_id': pod_id,
'bottom_id': b_sg_id,
'resource_type': constants.RT_SG}
with t_ctx.session.begin():
core.create_resource(t_ctx, models.ResourceRouting, route1)
self.assertRaises(exceptions.DefaultGroupUpdateNotSupported,
plugin.create_security_group_rule, q_ctx,
{'security_group_rule': t_rule2})
self.assertRaises(exceptions.DefaultGroupUpdateNotSupported,
plugin.delete_security_group_rule, q_ctx, t_rule1_id)
def _test_create_security_group_rule_exception(
self, plugin, q_ctx, t_ctx, pod_id, top_sgs, bottom1_sgs):
t_sg_id = uuidutils.generate_uuid()
t_rule_id = uuidutils.generate_uuid()
b_sg_id = uuidutils.generate_uuid()
project_id = 'test_prject_id'
t_sg = {'id': t_sg_id, 'name': 'test', 'description': '',
'tenant_id': project_id,
'security_group_rules': []}
b_sg = {'id': b_sg_id, 'name': t_sg_id, 'description': '',
'tenant_id': project_id,
'security_group_rules': []}
top_sgs.append(t_sg)
bottom1_sgs.append(b_sg)
route = {
'top_id': t_sg_id,
'pod_id': pod_id,
'bottom_id': b_sg_id,
'resource_type': constants.RT_SG}
with t_ctx.session.begin():
core.create_resource(t_ctx, models.ResourceRouting, route)
rule = {
'security_group_rule': self._build_test_rule(
t_rule_id, t_sg_id, project_id, '10.0.0.0/24')}
self.assertRaises(exceptions.BottomPodOperationFailure,
plugin.create_security_group_rule, q_ctx, rule)
def _test_delete_security_group_rule_exception(self, plugin, q_ctx, t_ctx,
pod_id, top_sgs, top_rules,
bottom1_sgs):
t_sg_id = uuidutils.generate_uuid()
t_rule_id = uuidutils.generate_uuid()
b_sg_id = uuidutils.generate_uuid()
project_id = 'test_prject_id'
t_rule = self._build_test_rule(
t_rule_id, t_sg_id, project_id, '10.0.1.0/24')
b_rule = self._build_test_rule(
t_rule_id, b_sg_id, project_id, '10.0.1.0/24')
t_sg = {'id': t_sg_id, 'name': 'test', 'description': '',
'tenant_id': project_id,
'security_group_rules': [t_rule]}
b_sg = {'id': b_sg_id, 'name': t_sg_id, 'description': '',
'tenant_id': project_id,
'security_group_rules': [b_rule]}
top_sgs.append(t_sg)
top_rules.append(t_rule)
bottom1_sgs.append(b_sg)
route = {
'top_id': t_sg_id,
'pod_id': pod_id,
'bottom_id': b_sg_id,
'resource_type': constants.RT_SG}
with t_ctx.session.begin():
core.create_resource(t_ctx, models.ResourceRouting, route)
self.assertRaises(exceptions.BottomPodOperationFailure,
plugin.delete_security_group_rule, q_ctx, t_rule_id)