From ca5a92d8e27cf5bdff8b3778425ab71b97f368cb Mon Sep 17 00:00:00 2001 From: Akihiro MOTOKI Date: Fri, 8 Feb 2013 22:18:33 +0900 Subject: [PATCH] Support iptables-based security group in NEC plugin blueprint nec-security-group This commit also refactors RPC API and callbacks in the plugin and agent to support security group RPC. Change-Id: I09d69ca3aff43e0468bbd5df6367de767af27acc --- etc/quantum/plugins/nec/nec.ini | 4 + quantum/agent/securitygroups_rpc.py | 6 +- .../versions/3cb5d900c5de_security_groups.py | 1 + .../plugins/nec/agent/nec_quantum_agent.py | 139 ++++++++++++--- quantum/plugins/nec/db/api.py | 30 ++++ quantum/plugins/nec/nec_plugin.py | 158 ++++++++++++++---- quantum/tests/unit/nec/test_nec_plugin.py | 2 +- quantum/tests/unit/nec/test_security_group.py | 88 ++++++++++ 8 files changed, 369 insertions(+), 59 deletions(-) create mode 100644 quantum/tests/unit/nec/test_security_group.py diff --git a/etc/quantum/plugins/nec/nec.ini b/etc/quantum/plugins/nec/nec.ini index 3de76e1159..21737c5058 100644 --- a/etc/quantum/plugins/nec/nec.ini +++ b/etc/quantum/plugins/nec/nec.ini @@ -39,6 +39,10 @@ polling_interval = 2 # Change to "sudo" to skip the filtering and just run the comand directly root_helper = sudo +[SECURITYGROUP] +# Firewall driver for realizing quantum security group function +firewall_driver = quantum.agent.linux.iptables_firewall.OVSHybridIptablesFirewallDriver + [OFC] # Specify OpenFlow Controller Host, Port and Driver to connect. host = 127.0.0.1 diff --git a/quantum/agent/securitygroups_rpc.py b/quantum/agent/securitygroups_rpc.py index 889d90e655..01b4e110fe 100644 --- a/quantum/agent/securitygroups_rpc.py +++ b/quantum/agent/securitygroups_rpc.py @@ -86,9 +86,9 @@ class SecurityGroupAgentRpcMixin(object): """ def init_firewall(self): - LOG.debug(_("Init firewall settings")) - self.firewall = importutils.import_object( - cfg.CONF.SECURITYGROUP.firewall_driver) + firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver + LOG.debug(_("Init firewall settings (driver=%s)"), firewall_driver) + self.firewall = importutils.import_object(firewall_driver) def prepare_devices_filter(self, device_ids): if not device_ids: diff --git a/quantum/db/migration/alembic_migrations/versions/3cb5d900c5de_security_groups.py b/quantum/db/migration/alembic_migrations/versions/3cb5d900c5de_security_groups.py index 2478ff9eb0..59e0065373 100644 --- a/quantum/db/migration/alembic_migrations/versions/3cb5d900c5de_security_groups.py +++ b/quantum/db/migration/alembic_migrations/versions/3cb5d900c5de_security_groups.py @@ -33,6 +33,7 @@ migration_for_plugins = [ 'quantum.plugins.linuxbridge.lb_quantum_plugin.LinuxBridgePluginV2', 'quantum.plugins.nicira.nicira_nvp_plugin.QuantumPlugin.NvpPluginV2', 'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2', + 'quantum.plugins.nec.nec_plugin.NECPluginV2', ] from alembic import op diff --git a/quantum/plugins/nec/agent/nec_quantum_agent.py b/quantum/plugins/nec/agent/nec_quantum_agent.py index 8c8786b4e2..a170e0db6c 100755 --- a/quantum/plugins/nec/agent/nec_quantum_agent.py +++ b/quantum/plugins/nec/agent/nec_quantum_agent.py @@ -20,23 +20,99 @@ # License for the specific language governing permissions and limitations # under the License. # @author: Ryota MIBU +# @author: Akihiro MOTOKI import socket import sys import time +import eventlet + from quantum.agent.linux import ovs_lib +from quantum.agent import rpc as agent_rpc +from quantum.agent import securitygroups_rpc as sg_rpc from quantum.common import config as logging_config from quantum.common import topics -from quantum import context +from quantum import context as q_context +from quantum.extensions import securitygroup as ext_sg from quantum.openstack.common import log as logging -from quantum.openstack.common import rpc +from quantum.openstack.common.rpc import dispatcher +from quantum.openstack.common.rpc import proxy from quantum.plugins.nec.common import config LOG = logging.getLogger(__name__) +class NECPluginApi(agent_rpc.PluginApi): + BASE_RPC_API_VERSION = '1.0' + + def update_ports(self, context, agent_id, datapath_id, + port_added, port_removed): + """RPC to update information of ports on Quantum Server""" + LOG.info(_("Update ports: added=%(added)s, " + "removed=%(removed)s"), + {'added': port_added, 'removed': port_removed}) + try: + self.call(context, + self.make_msg('update_ports', + topic=topics.AGENT, + agent_id=agent_id, + datapath_id=datapath_id, + port_added=port_added, + port_removed=port_removed)) + except Exception as e: + LOG.warn(_("update_ports() failed.")) + return + + +class NECAgentRpcCallback(object): + + RPC_API_VERSION = '1.0' + + def __init__(self, context, agent, sg_agent): + self.context = context + self.agent = agent + self.sg_agent = sg_agent + + def port_update(self, context, **kwargs): + LOG.debug(_("port_update received: %s"), kwargs) + port = kwargs.get('port') + # Validate that port is on OVS + vif_port = self.agent.int_br.get_vif_port_by_id(port['id']) + if not vif_port: + return + + if ext_sg.SECURITYGROUPS in port: + self.sg_agent.refresh_firewall() + + +class SecurityGroupServerRpcApi(proxy.RpcProxy, + sg_rpc.SecurityGroupServerRpcApiMixin): + + def __init__(self, topic): + super(SecurityGroupServerRpcApi, self).__init__( + topic=topic, default_version=sg_rpc.SG_RPC_VERSION) + + +class SecurityGroupAgentRpcCallback( + sg_rpc.SecurityGroupAgentRpcCallbackMixin): + + RPC_API_VERSION = sg_rpc.SG_RPC_VERSION + + def __init__(self, context, sg_agent): + self.context = context + self.sg_agent = sg_agent + + +class SecurityGroupAgentRpc(sg_rpc.SecurityGroupAgentRpcMixin): + + def __init__(self, context): + self.context = context + self.plugin_rpc = SecurityGroupServerRpcApi(topics.PLUGIN) + self.init_firewall() + + class NECQuantumAgent(object): def __init__(self, integ_br, root_helper, polling_interval): @@ -49,36 +125,46 @@ class NECQuantumAgent(object): self.int_br = ovs_lib.OVSBridge(integ_br, root_helper) self.polling_interval = polling_interval + self.datapath_id = "0x%s" % self.int_br.get_datapath_id() + self.setup_rpc() + + def setup_rpc(self): self.host = socket.gethostname() self.agent_id = 'nec-q-agent.%s' % self.host - self.datapath_id = "0x%s" % self.int_br.get_datapath_id() + LOG.info(_("RPC agent_id: %s"), self.agent_id) + + self.topic = topics.AGENT + self.context = q_context.get_admin_context_without_session() + + self.plugin_rpc = NECPluginApi(topics.PLUGIN) + self.sg_agent = SecurityGroupAgentRpc(self.context) # RPC network init - self.context = context.get_admin_context_without_session() - self.conn = rpc.create_connection(new=True) - - def update_ports(self, port_added=[], port_removed=[]): - """RPC to update information of ports on Quantum Server""" - LOG.info(_("Update ports: added=%(port_added)s, " - "removed=%(port_removed)s"), - locals()) - try: - rpc.call(self.context, - topics.PLUGIN, - {'method': 'update_ports', - 'args': {'topic': topics.AGENT, - 'agent_id': self.agent_id, - 'datapath_id': self.datapath_id, - 'port_added': port_added, - 'port_removed': port_removed}}) - except Exception as e: - LOG.warn(_("update_ports() failed.")) - return + # Handle updates from service + self.callback_nec = NECAgentRpcCallback(self.context, + self, self.sg_agent) + self.callback_sg = SecurityGroupAgentRpcCallback(self.context, + self.sg_agent) + self.dispatcher = dispatcher.RpcDispatcher([self.callback_nec, + self.callback_sg]) + # Define the listening consumer for the agent + consumers = [[topics.PORT, topics.UPDATE], + [topics.SECURITY_GROUP, topics.UPDATE]] + self.connection = agent_rpc.create_consumers(self.dispatcher, + self.topic, + consumers) def _vif_port_to_port_info(self, vif_port): return dict(id=vif_port.vif_id, port_no=vif_port.ofport, mac=vif_port.vif_mac) + def _process_security_group(self, port_added, port_removed): + if port_added: + devices_added = [p['id'] for p in port_added] + self.sg_agent.prepare_devices_filter(devices_added) + if port_removed: + self.sg_agent.remove_devices_filter(port_removed) + def daemon_loop(self): """Main processing loop for NEC Plugin Agent.""" old_ports = [] @@ -99,7 +185,10 @@ class NECQuantumAgent(object): port_removed.append(port_id) if port_added or port_removed: - self.update_ports(port_added, port_removed) + self.plugin_rpc.update_ports(self.context, + self.agent_id, self.datapath_id, + port_added, port_removed) + self._process_security_group(port_added, port_removed) else: LOG.debug(_("No port changed.")) @@ -108,6 +197,8 @@ class NECQuantumAgent(object): def main(): + eventlet.monkey_patch() + config.CONF(project='quantum') logging_config.setup_logging(config.CONF) diff --git a/quantum/plugins/nec/db/api.py b/quantum/plugins/nec/db/api.py index 7eb822da56..ca2b27656f 100644 --- a/quantum/plugins/nec/db/api.py +++ b/quantum/plugins/nec/db/api.py @@ -19,6 +19,10 @@ import sqlalchemy as sa from quantum.db import api as db from quantum.db import model_base +from quantum.db import models_v2 +from quantum.db import securitygroups_db as sg_db +from quantum.extensions import securitygroup as ext_sg +from quantum import manager from quantum.openstack.common import log as logging # NOTE (e0ne): this import is needed for config init from quantum.plugins.nec.common import config @@ -117,3 +121,29 @@ def del_portinfo(id): except sa.orm.exc.NoResultFound: LOG.warning(_("del_portinfo(): NotFound portinfo for " "port_id: %s"), id) + + +def get_port_from_device(port_id): + """Get port from database""" + LOG.debug(_("get_port_with_securitygroups() called:port_id=%s"), port_id) + session = db.get_session() + sg_binding_port = sg_db.SecurityGroupPortBinding.port_id + + query = session.query(models_v2.Port, + sg_db.SecurityGroupPortBinding.security_group_id) + query = query.outerjoin(sg_db.SecurityGroupPortBinding, + models_v2.Port.id == sg_binding_port) + query = query.filter(models_v2.Port.id == port_id) + port_and_sgs = query.all() + if not port_and_sgs: + return None + port = port_and_sgs[0][0] + plugin = manager.QuantumManager.get_plugin() + port_dict = plugin._make_port_dict(port) + port_dict[ext_sg.SECURITYGROUPS] = [ + sg_id for port, sg_id in port_and_sgs if sg_id] + port_dict['security_group_rules'] = [] + port_dict['security_group_source_groups'] = [] + port_dict['fixed_ips'] = [ip['ip_address'] + for ip in port['fixed_ips']] + return port_dict diff --git a/quantum/plugins/nec/nec_plugin.py b/quantum/plugins/nec/nec_plugin.py index 552f8ce058..0149157c0d 100644 --- a/quantum/plugins/nec/nec_plugin.py +++ b/quantum/plugins/nec/nec_plugin.py @@ -15,6 +15,9 @@ # under the License. # @author: Ryota MIBU +from quantum.agent import securitygroups_rpc as sg_rpc +from quantum.common import constants as q_const +from quantum.common import exceptions as q_exc from quantum.common import rpc as q_rpc from quantum.common import topics from quantum import context @@ -23,9 +26,12 @@ from quantum.db import l3_db from quantum.db import l3_rpc_base #NOTE(amotoki): quota_db cannot be removed, it is for db model from quantum.db import quota_db +from quantum.db import securitygroups_rpc_base as sg_db_rpc from quantum.extensions import portbindings +from quantum.extensions import securitygroup as ext_sg from quantum.openstack.common import log as logging from quantum.openstack.common import rpc +from quantum.openstack.common.rpc import proxy from quantum.plugins.nec.common import config from quantum.plugins.nec.common import exceptions as nexc from quantum.plugins.nec.db import api as ndb @@ -51,7 +57,9 @@ class OperationalStatus: ERROR = "ERROR" -class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin): +class NECPluginV2(nec_plugin_base.NECPluginV2Base, + l3_db.L3_NAT_db_mixin, + sg_db_rpc.SecurityGroupServerRpcMixin): """NECPluginV2 controls an OpenFlow Controller. The Quantum NECPluginV2 maps L2 logical networks to L2 virtualized networks @@ -65,7 +73,8 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin): information to and from the plugin. """ - supported_extension_aliases = ["router", "quotas", "binding"] + supported_extension_aliases = ["router", "quotas", "binding", + "security-group"] binding_view = "extension:port_binding:view" binding_set = "extension:port_binding:set" @@ -81,21 +90,28 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin): self.setup_rpc() + def setup_rpc(self): + self.topic = topics.PLUGIN + self.conn = rpc.create_connection(new=True) + self.notifier = NECPluginV2AgentNotifierApi(topics.AGENT) + + self.callback_nec = NECPluginV2RPCCallbacks(self) + self.callback_dhcp = DhcpRpcCallback() + self.callback_l3 = L3RpcCallback() + self.callback_sg = SecurityGroupServerRpcCallback() + callbacks = [self.callback_nec, self.callback_dhcp, + self.callback_l3, self.callback_sg] + self.dispatcher = q_rpc.PluginRpcDispatcher(callbacks) + self.conn.create_consumer(self.topic, self.dispatcher, fanout=False) + # Consume from all consumers in a thread + self.conn.consume_in_thread() + def _check_view_auth(self, context, resource, action): return policy.check(context, action, resource) def _enforce_set_auth(self, context, resource, action): policy.enforce(context, action, resource) - def setup_rpc(self): - self.topic = topics.PLUGIN - self.conn = rpc.create_connection(new=True) - self.callbacks = NECPluginV2RPCCallbacks(self) - self.dispatcher = self.callbacks.create_rpc_dispatcher() - self.conn.create_consumer(self.topic, self.dispatcher, fanout=False) - # Consume from all consumers in a thread - self.conn.consume_in_thread() - def _update_resource_status(self, context, resource, id, status): """Update status of specified resource.""" request = {} @@ -199,8 +215,12 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin): """Create a new network entry on DB, and create it on OFC.""" LOG.debug(_("NECPluginV2.create_network() called, " "network=%s ."), network) - session = context.session - with session.begin(subtransactions=True): + #set up default security groups + tenant_id = self._get_tenant_id_for_create( + context, network['network']) + self._ensure_default_security_group(context, tenant_id) + + with context.session.begin(subtransactions=True): new_net = super(NECPluginV2, self).create_network(context, network) self._process_l3_create(context, network['network'], new_net['id']) self._extend_network_dict_l3(context, new_net) @@ -337,12 +357,25 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin): def create_port(self, context, port): """Create a new port entry on DB, then try to activate it.""" LOG.debug(_("NECPluginV2.create_port() called, port=%s ."), port) - new_port = super(NECPluginV2, self).create_port(context, port) - self._update_resource_status(context, "port", new_port['id'], - OperationalStatus.BUILD) + with context.session.begin(subtransactions=True): + self._ensure_default_security_group_on_port(context, port) + sgids = self._get_security_groups_on_port(context, port) + port = super(NECPluginV2, self).create_port(context, port) + self._process_port_create_security_group( + context, port['id'], sgids) + self._extend_port_dict_security_group(context, port) + # Note: In order to allow dhcp packets, + # changes for dhcp ip should be notifified + if port['device_owner'] == q_const.DEVICE_OWNER_DHCP: + self.notifier.security_groups_provider_updated(context) + else: + self.notifier.security_groups_member_updated( + context, port.get(ext_sg.SECURITYGROUPS)) - self.activate_port_if_ready(context, new_port) - return self._extend_port_dict_binding(context, new_port) + self._update_resource_status(context, "port", port['id'], + OperationalStatus.BUILD) + self.activate_port_if_ready(context, port) + return self._extend_port_dict_binding(context, port) def update_port(self, context, id, port): """Update port, and handle packetfilters associated with the port. @@ -352,23 +385,37 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin): """ LOG.debug(_("NECPluginV2.update_port() called, " "id=%(id)s port=%(port)s ."), locals()) - old_port = super(NECPluginV2, self).get_port(context, id) - new_port = super(NECPluginV2, self).update_port(context, id, port) + need_port_update_notify = False + with context.session.begin(subtransactions=True): + old_port = super(NECPluginV2, self).get_port(context, id) + new_port = super(NECPluginV2, self).update_port(context, id, port) + need_port_update_notify = self.update_security_group_on_port( + context, id, port, old_port, new_port) - changed = (old_port['admin_state_up'] is - not new_port['admin_state_up']) + need_port_update_notify |= self.is_security_group_member_updated( + context, old_port, new_port) + if need_port_update_notify: + self.notifier.port_update(context, new_port) + + changed = (old_port['admin_state_up'] != new_port['admin_state_up']) if changed: if new_port['admin_state_up']: self.activate_port_if_ready(context, new_port) else: self.deactivate_port(context, old_port) + # NOTE: _extend_port_dict_security_group() is called in + # update_security_group_on_port() above, so we don't need to + # call it here. return self._extend_port_dict_binding(context, new_port) def delete_port(self, context, id, l3_port_check=True): """Delete port and packet_filters associated with the port.""" LOG.debug(_("NECPluginV2.delete_port() called, id=%s ."), id) - port = super(NECPluginV2, self).get_port(context, id) + # ext_sg.SECURITYGROUPS attribute for the port is required + # since notifier.security_groups_member_updated() need the attribute. + # Thus we need to call self.get_port() instead of super().get_port() + port = self.get_port(context, id) self.deactivate_port(context, port) @@ -384,22 +431,27 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin): # and l3-router. If so, we should prevent deletion. if l3_port_check: self.prevent_l3_port_deletion(context, id) - self.disassociate_floatingips(context, id) - super(NECPluginV2, self).delete_port(context, id) + with context.session.begin(subtransactions=True): + self.disassociate_floatingips(context, id) + self._delete_port_security_group_bindings(context, id) + super(NECPluginV2, self).delete_port(context, id) + self.notifier.security_groups_member_updated( + context, port.get(ext_sg.SECURITYGROUPS)) def get_port(self, context, id, fields=None): - session = context.session - with session.begin(subtransactions=True): + with context.session.begin(subtransactions=True): port = super(NECPluginV2, self).get_port(context, id, fields) + self._extend_port_dict_security_group(context, port) self._extend_port_dict_binding(context, port) return self._fields(port, fields) def get_ports(self, context, filters=None, fields=None): - session = context.session - with session.begin(subtransactions=True): + with context.session.begin(subtransactions=True): ports = super(NECPluginV2, self).get_ports(context, filters, fields) + # TODO(amotoki) filter by security group for port in ports: + self._extend_port_dict_security_group(context, port) self._extend_port_dict_binding(context, port) return [self._fields(port, fields) for port in ports] @@ -529,8 +581,52 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin): super(NECPluginV2, self).delete_packet_filter(context, id) -class NECPluginV2RPCCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, - l3_rpc_base.L3RpcCallbackMixin): +class NECPluginV2AgentNotifierApi(proxy.RpcProxy, + sg_rpc.SecurityGroupAgentRpcApiMixin): + '''RPC API for NEC plugin agent''' + + BASE_RPC_API_VERSION = '1.0' + + def __init__(self, topic): + super(NECPluginV2AgentNotifierApi, self).__init__( + topic=topic, default_version=self.BASE_RPC_API_VERSION) + self.topic_port_update = topics.get_topic_name( + topic, topics.PORT, topics.UPDATE) + + def port_update(self, context, port): + self.fanout_cast(context, + self.make_msg('port_update', + port=port), + topic=self.topic_port_update) + + +class DhcpRpcCallback(dhcp_rpc_base.DhcpRpcCallbackMixin): + # DhcpPluginApi BASE_RPC_API_VERSION + RPC_API_VERSION = '1.0' + + +class L3RpcCallback(l3_rpc_base.L3RpcCallbackMixin): + # L3PluginApi BASE_RPC_API_VERSION + RPC_API_VERSION = '1.0' + + +class SecurityGroupServerRpcCallback( + sg_db_rpc.SecurityGroupServerRpcCallbackMixin): + + RPC_API_VERSION = sg_rpc.SG_RPC_VERSION + + @staticmethod + def get_port_from_device(device): + port = ndb.get_port_from_device(device) + if port: + port['device'] = device + LOG.debug(_("NECPluginV2RPCCallbacks.get_port_from_device() called, " + "device=%(device)s => %(ret)s."), + {'device': device, 'ret': port}) + return port + + +class NECPluginV2RPCCallbacks(object): RPC_API_VERSION = '1.0' diff --git a/quantum/tests/unit/nec/test_nec_plugin.py b/quantum/tests/unit/nec/test_nec_plugin.py index 9a06f96d77..747583ad35 100644 --- a/quantum/tests/unit/nec/test_nec_plugin.py +++ b/quantum/tests/unit/nec/test_nec_plugin.py @@ -39,7 +39,7 @@ class TestNecPortsV2(test_plugin.TestPortsV2, NecPluginV2TestCase, test_bindings.PortBindingsTestCase): VIF_TYPE = portbindings.VIF_TYPE_OVS - HAS_SECURITY_GROUP = False + HAS_PORT_FILTER = True class TestNecNetworksV2(test_plugin.TestNetworksV2, NecPluginV2TestCase): diff --git a/quantum/tests/unit/nec/test_security_group.py b/quantum/tests/unit/nec/test_security_group.py new file mode 100644 index 0000000000..8b1512ed6a --- /dev/null +++ b/quantum/tests/unit/nec/test_security_group.py @@ -0,0 +1,88 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013, NEC Corporation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib + +import mock + +from quantum.api.v2 import attributes +from quantum.extensions import securitygroup as ext_sg +from quantum import manager +from quantum.plugins.nec.db import api as ndb +from quantum.tests.unit import test_extension_security_group as test_sg +from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc + +PLUGIN_NAME = ('quantum.plugins.nec.nec_plugin.NECPluginV2') +AGENT_NAME = ('quantum.plugins.nec.agent.nec_quantum_agent.NECQuantumAgent') +NOTIFIER = ('quantum.plugins.nec.nec_plugin.NECPluginV2AgentNotifierApi') + + +class NecSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase): + _plugin_name = PLUGIN_NAME + + def setUp(self, plugin=None): + self.addCleanup(mock.patch.stopall) + notifier_p = mock.patch(NOTIFIER) + notifier_cls = notifier_p.start() + self.notifier = mock.Mock() + notifier_cls.return_value = self.notifier + self._attribute_map_bk_ = {} + for item in attributes.RESOURCE_ATTRIBUTE_MAP: + self._attribute_map_bk_[item] = (attributes. + RESOURCE_ATTRIBUTE_MAP[item]. + copy()) + super(NecSecurityGroupsTestCase, self).setUp(PLUGIN_NAME) + + def tearDown(self): + super(NecSecurityGroupsTestCase, self).tearDown() + attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk_ + + +class TestNecSecurityGroups(NecSecurityGroupsTestCase, + test_sg.TestSecurityGroups, + test_sg_rpc.SGNotificationTestMixin): + + def test_security_group_get_port_from_device(self): + with contextlib.nested(self.network(), + self.security_group()) as (n, sg): + with self.subnet(n): + res = self._create_port(self.fmt, n['network']['id']) + port = self.deserialize(self.fmt, res) + port_id = port['port']['id'] + sg_id = sg['security_group']['id'] + fixed_ips = port['port']['fixed_ips'] + + data = {'port': {'fixed_ips': fixed_ips, + 'name': port['port']['name'], + ext_sg.SECURITYGROUPS: [sg_id]}} + req = self.new_update_request('ports', data, port_id) + res = self.deserialize(self.fmt, + req.get_response(self.api)) + + plugin = manager.QuantumManager.get_plugin() + port_dict = plugin.callback_sg.get_port_from_device(port_id) + self.assertEqual(port_id, port_dict['id']) + self.assertEqual([sg_id], + port_dict[ext_sg.SECURITYGROUPS]) + self.assertEqual([], port_dict['security_group_rules']) + self.assertEqual([fixed_ips[0]['ip_address']], + port_dict['fixed_ips']) + self._delete('ports', port_id) + + +class TestNecSecurityGroupsXML(TestNecSecurityGroups): + fmt = 'xml'