Merge "Support iptables-based security group in NEC plugin"

This commit is contained in:
Jenkins 2013-02-14 10:01:51 +00:00 committed by Gerrit Code Review
commit b393aff794
8 changed files with 369 additions and 59 deletions

View File

@ -39,6 +39,10 @@ polling_interval = 2
# Change to "sudo" to skip the filtering and just run the comand directly # Change to "sudo" to skip the filtering and just run the comand directly
root_helper = sudo root_helper = sudo
[SECURITYGROUP]
# Firewall driver for realizing quantum security group function
firewall_driver = quantum.agent.linux.iptables_firewall.OVSHybridIptablesFirewallDriver
[OFC] [OFC]
# Specify OpenFlow Controller Host, Port and Driver to connect. # Specify OpenFlow Controller Host, Port and Driver to connect.
host = 127.0.0.1 host = 127.0.0.1

View File

@ -86,9 +86,9 @@ class SecurityGroupAgentRpcMixin(object):
""" """
def init_firewall(self): def init_firewall(self):
LOG.debug(_("Init firewall settings")) firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver
self.firewall = importutils.import_object( LOG.debug(_("Init firewall settings (driver=%s)"), firewall_driver)
cfg.CONF.SECURITYGROUP.firewall_driver) self.firewall = importutils.import_object(firewall_driver)
def prepare_devices_filter(self, device_ids): def prepare_devices_filter(self, device_ids):
if not device_ids: if not device_ids:

View File

@ -33,6 +33,7 @@ migration_for_plugins = [
'quantum.plugins.linuxbridge.lb_quantum_plugin.LinuxBridgePluginV2', 'quantum.plugins.linuxbridge.lb_quantum_plugin.LinuxBridgePluginV2',
'quantum.plugins.nicira.nicira_nvp_plugin.QuantumPlugin.NvpPluginV2', 'quantum.plugins.nicira.nicira_nvp_plugin.QuantumPlugin.NvpPluginV2',
'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2', 'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2',
'quantum.plugins.nec.nec_plugin.NECPluginV2',
] ]
from alembic import op from alembic import op

View File

@ -20,23 +20,99 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
# @author: Ryota MIBU # @author: Ryota MIBU
# @author: Akihiro MOTOKI
import socket import socket
import sys import sys
import time import time
import eventlet
from quantum.agent.linux import ovs_lib from quantum.agent.linux import ovs_lib
from quantum.agent import rpc as agent_rpc
from quantum.agent import securitygroups_rpc as sg_rpc
from quantum.common import config as logging_config from quantum.common import config as logging_config
from quantum.common import topics from quantum.common import topics
from quantum import context from quantum import context as q_context
from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc from quantum.openstack.common.rpc import dispatcher
from quantum.openstack.common.rpc import proxy
from quantum.plugins.nec.common import config from quantum.plugins.nec.common import config
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class NECPluginApi(agent_rpc.PluginApi):
BASE_RPC_API_VERSION = '1.0'
def update_ports(self, context, agent_id, datapath_id,
port_added, port_removed):
"""RPC to update information of ports on Quantum Server"""
LOG.info(_("Update ports: added=%(added)s, "
"removed=%(removed)s"),
{'added': port_added, 'removed': port_removed})
try:
self.call(context,
self.make_msg('update_ports',
topic=topics.AGENT,
agent_id=agent_id,
datapath_id=datapath_id,
port_added=port_added,
port_removed=port_removed))
except Exception as e:
LOG.warn(_("update_ports() failed."))
return
class NECAgentRpcCallback(object):
RPC_API_VERSION = '1.0'
def __init__(self, context, agent, sg_agent):
self.context = context
self.agent = agent
self.sg_agent = sg_agent
def port_update(self, context, **kwargs):
LOG.debug(_("port_update received: %s"), kwargs)
port = kwargs.get('port')
# Validate that port is on OVS
vif_port = self.agent.int_br.get_vif_port_by_id(port['id'])
if not vif_port:
return
if ext_sg.SECURITYGROUPS in port:
self.sg_agent.refresh_firewall()
class SecurityGroupServerRpcApi(proxy.RpcProxy,
sg_rpc.SecurityGroupServerRpcApiMixin):
def __init__(self, topic):
super(SecurityGroupServerRpcApi, self).__init__(
topic=topic, default_version=sg_rpc.SG_RPC_VERSION)
class SecurityGroupAgentRpcCallback(
sg_rpc.SecurityGroupAgentRpcCallbackMixin):
RPC_API_VERSION = sg_rpc.SG_RPC_VERSION
def __init__(self, context, sg_agent):
self.context = context
self.sg_agent = sg_agent
class SecurityGroupAgentRpc(sg_rpc.SecurityGroupAgentRpcMixin):
def __init__(self, context):
self.context = context
self.plugin_rpc = SecurityGroupServerRpcApi(topics.PLUGIN)
self.init_firewall()
class NECQuantumAgent(object): class NECQuantumAgent(object):
def __init__(self, integ_br, root_helper, polling_interval): def __init__(self, integ_br, root_helper, polling_interval):
@ -49,36 +125,46 @@ class NECQuantumAgent(object):
self.int_br = ovs_lib.OVSBridge(integ_br, root_helper) self.int_br = ovs_lib.OVSBridge(integ_br, root_helper)
self.polling_interval = polling_interval self.polling_interval = polling_interval
self.datapath_id = "0x%s" % self.int_br.get_datapath_id()
self.setup_rpc()
def setup_rpc(self):
self.host = socket.gethostname() self.host = socket.gethostname()
self.agent_id = 'nec-q-agent.%s' % self.host self.agent_id = 'nec-q-agent.%s' % self.host
self.datapath_id = "0x%s" % self.int_br.get_datapath_id() LOG.info(_("RPC agent_id: %s"), self.agent_id)
self.topic = topics.AGENT
self.context = q_context.get_admin_context_without_session()
self.plugin_rpc = NECPluginApi(topics.PLUGIN)
self.sg_agent = SecurityGroupAgentRpc(self.context)
# RPC network init # RPC network init
self.context = context.get_admin_context_without_session() # Handle updates from service
self.conn = rpc.create_connection(new=True) self.callback_nec = NECAgentRpcCallback(self.context,
self, self.sg_agent)
def update_ports(self, port_added=[], port_removed=[]): self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
"""RPC to update information of ports on Quantum Server""" self.sg_agent)
LOG.info(_("Update ports: added=%(port_added)s, " self.dispatcher = dispatcher.RpcDispatcher([self.callback_nec,
"removed=%(port_removed)s"), self.callback_sg])
locals()) # Define the listening consumer for the agent
try: consumers = [[topics.PORT, topics.UPDATE],
rpc.call(self.context, [topics.SECURITY_GROUP, topics.UPDATE]]
topics.PLUGIN, self.connection = agent_rpc.create_consumers(self.dispatcher,
{'method': 'update_ports', self.topic,
'args': {'topic': topics.AGENT, consumers)
'agent_id': self.agent_id,
'datapath_id': self.datapath_id,
'port_added': port_added,
'port_removed': port_removed}})
except Exception as e:
LOG.warn(_("update_ports() failed."))
return
def _vif_port_to_port_info(self, vif_port): def _vif_port_to_port_info(self, vif_port):
return dict(id=vif_port.vif_id, port_no=vif_port.ofport, return dict(id=vif_port.vif_id, port_no=vif_port.ofport,
mac=vif_port.vif_mac) mac=vif_port.vif_mac)
def _process_security_group(self, port_added, port_removed):
if port_added:
devices_added = [p['id'] for p in port_added]
self.sg_agent.prepare_devices_filter(devices_added)
if port_removed:
self.sg_agent.remove_devices_filter(port_removed)
def daemon_loop(self): def daemon_loop(self):
"""Main processing loop for NEC Plugin Agent.""" """Main processing loop for NEC Plugin Agent."""
old_ports = [] old_ports = []
@ -99,7 +185,10 @@ class NECQuantumAgent(object):
port_removed.append(port_id) port_removed.append(port_id)
if port_added or port_removed: if port_added or port_removed:
self.update_ports(port_added, port_removed) self.plugin_rpc.update_ports(self.context,
self.agent_id, self.datapath_id,
port_added, port_removed)
self._process_security_group(port_added, port_removed)
else: else:
LOG.debug(_("No port changed.")) LOG.debug(_("No port changed."))
@ -108,6 +197,8 @@ class NECQuantumAgent(object):
def main(): def main():
eventlet.monkey_patch()
config.CONF(project='quantum') config.CONF(project='quantum')
logging_config.setup_logging(config.CONF) logging_config.setup_logging(config.CONF)

View File

@ -19,6 +19,10 @@ import sqlalchemy as sa
from quantum.db import api as db from quantum.db import api as db
from quantum.db import model_base from quantum.db import model_base
from quantum.db import models_v2
from quantum.db import securitygroups_db as sg_db
from quantum.extensions import securitygroup as ext_sg
from quantum import manager
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
# NOTE (e0ne): this import is needed for config init # NOTE (e0ne): this import is needed for config init
from quantum.plugins.nec.common import config from quantum.plugins.nec.common import config
@ -117,3 +121,29 @@ def del_portinfo(id):
except sa.orm.exc.NoResultFound: except sa.orm.exc.NoResultFound:
LOG.warning(_("del_portinfo(): NotFound portinfo for " LOG.warning(_("del_portinfo(): NotFound portinfo for "
"port_id: %s"), id) "port_id: %s"), id)
def get_port_from_device(port_id):
"""Get port from database"""
LOG.debug(_("get_port_with_securitygroups() called:port_id=%s"), port_id)
session = db.get_session()
sg_binding_port = sg_db.SecurityGroupPortBinding.port_id
query = session.query(models_v2.Port,
sg_db.SecurityGroupPortBinding.security_group_id)
query = query.outerjoin(sg_db.SecurityGroupPortBinding,
models_v2.Port.id == sg_binding_port)
query = query.filter(models_v2.Port.id == port_id)
port_and_sgs = query.all()
if not port_and_sgs:
return None
port = port_and_sgs[0][0]
plugin = manager.QuantumManager.get_plugin()
port_dict = plugin._make_port_dict(port)
port_dict[ext_sg.SECURITYGROUPS] = [
sg_id for port, sg_id in port_and_sgs if sg_id]
port_dict['security_group_rules'] = []
port_dict['security_group_source_groups'] = []
port_dict['fixed_ips'] = [ip['ip_address']
for ip in port['fixed_ips']]
return port_dict

View File

@ -15,6 +15,9 @@
# under the License. # under the License.
# @author: Ryota MIBU # @author: Ryota MIBU
from quantum.agent import securitygroups_rpc as sg_rpc
from quantum.common import constants as q_const
from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc from quantum.common import rpc as q_rpc
from quantum.common import topics from quantum.common import topics
from quantum import context from quantum import context
@ -23,9 +26,12 @@ from quantum.db import l3_db
from quantum.db import l3_rpc_base from quantum.db import l3_rpc_base
#NOTE(amotoki): quota_db cannot be removed, it is for db model #NOTE(amotoki): quota_db cannot be removed, it is for db model
from quantum.db import quota_db from quantum.db import quota_db
from quantum.db import securitygroups_rpc_base as sg_db_rpc
from quantum.extensions import portbindings from quantum.extensions import portbindings
from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
from quantum.plugins.nec.common import config from quantum.plugins.nec.common import config
from quantum.plugins.nec.common import exceptions as nexc from quantum.plugins.nec.common import exceptions as nexc
from quantum.plugins.nec.db import api as ndb from quantum.plugins.nec.db import api as ndb
@ -51,7 +57,9 @@ class OperationalStatus:
ERROR = "ERROR" ERROR = "ERROR"
class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin): class NECPluginV2(nec_plugin_base.NECPluginV2Base,
l3_db.L3_NAT_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin):
"""NECPluginV2 controls an OpenFlow Controller. """NECPluginV2 controls an OpenFlow Controller.
The Quantum NECPluginV2 maps L2 logical networks to L2 virtualized networks The Quantum NECPluginV2 maps L2 logical networks to L2 virtualized networks
@ -65,7 +73,8 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
information to and from the plugin. information to and from the plugin.
""" """
supported_extension_aliases = ["router", "quotas", "binding"] supported_extension_aliases = ["router", "quotas", "binding",
"security-group"]
binding_view = "extension:port_binding:view" binding_view = "extension:port_binding:view"
binding_set = "extension:port_binding:set" binding_set = "extension:port_binding:set"
@ -81,21 +90,28 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
self.setup_rpc() self.setup_rpc()
def setup_rpc(self):
self.topic = topics.PLUGIN
self.conn = rpc.create_connection(new=True)
self.notifier = NECPluginV2AgentNotifierApi(topics.AGENT)
self.callback_nec = NECPluginV2RPCCallbacks(self)
self.callback_dhcp = DhcpRpcCallback()
self.callback_l3 = L3RpcCallback()
self.callback_sg = SecurityGroupServerRpcCallback()
callbacks = [self.callback_nec, self.callback_dhcp,
self.callback_l3, self.callback_sg]
self.dispatcher = q_rpc.PluginRpcDispatcher(callbacks)
self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
def _check_view_auth(self, context, resource, action): def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource) return policy.check(context, action, resource)
def _enforce_set_auth(self, context, resource, action): def _enforce_set_auth(self, context, resource, action):
policy.enforce(context, action, resource) policy.enforce(context, action, resource)
def setup_rpc(self):
self.topic = topics.PLUGIN
self.conn = rpc.create_connection(new=True)
self.callbacks = NECPluginV2RPCCallbacks(self)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
def _update_resource_status(self, context, resource, id, status): def _update_resource_status(self, context, resource, id, status):
"""Update status of specified resource.""" """Update status of specified resource."""
request = {} request = {}
@ -199,8 +215,12 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
"""Create a new network entry on DB, and create it on OFC.""" """Create a new network entry on DB, and create it on OFC."""
LOG.debug(_("NECPluginV2.create_network() called, " LOG.debug(_("NECPluginV2.create_network() called, "
"network=%s ."), network) "network=%s ."), network)
session = context.session #set up default security groups
with session.begin(subtransactions=True): tenant_id = self._get_tenant_id_for_create(
context, network['network'])
self._ensure_default_security_group(context, tenant_id)
with context.session.begin(subtransactions=True):
new_net = super(NECPluginV2, self).create_network(context, network) new_net = super(NECPluginV2, self).create_network(context, network)
self._process_l3_create(context, network['network'], new_net['id']) self._process_l3_create(context, network['network'], new_net['id'])
self._extend_network_dict_l3(context, new_net) self._extend_network_dict_l3(context, new_net)
@ -337,12 +357,25 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
def create_port(self, context, port): def create_port(self, context, port):
"""Create a new port entry on DB, then try to activate it.""" """Create a new port entry on DB, then try to activate it."""
LOG.debug(_("NECPluginV2.create_port() called, port=%s ."), port) LOG.debug(_("NECPluginV2.create_port() called, port=%s ."), port)
new_port = super(NECPluginV2, self).create_port(context, port) with context.session.begin(subtransactions=True):
self._update_resource_status(context, "port", new_port['id'], self._ensure_default_security_group_on_port(context, port)
OperationalStatus.BUILD) sgids = self._get_security_groups_on_port(context, port)
port = super(NECPluginV2, self).create_port(context, port)
self._process_port_create_security_group(
context, port['id'], sgids)
self._extend_port_dict_security_group(context, port)
# Note: In order to allow dhcp packets,
# changes for dhcp ip should be notifified
if port['device_owner'] == q_const.DEVICE_OWNER_DHCP:
self.notifier.security_groups_provider_updated(context)
else:
self.notifier.security_groups_member_updated(
context, port.get(ext_sg.SECURITYGROUPS))
self.activate_port_if_ready(context, new_port) self._update_resource_status(context, "port", port['id'],
return self._extend_port_dict_binding(context, new_port) OperationalStatus.BUILD)
self.activate_port_if_ready(context, port)
return self._extend_port_dict_binding(context, port)
def update_port(self, context, id, port): def update_port(self, context, id, port):
"""Update port, and handle packetfilters associated with the port. """Update port, and handle packetfilters associated with the port.
@ -352,23 +385,37 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
""" """
LOG.debug(_("NECPluginV2.update_port() called, " LOG.debug(_("NECPluginV2.update_port() called, "
"id=%(id)s port=%(port)s ."), locals()) "id=%(id)s port=%(port)s ."), locals())
old_port = super(NECPluginV2, self).get_port(context, id) need_port_update_notify = False
new_port = super(NECPluginV2, self).update_port(context, id, port) with context.session.begin(subtransactions=True):
old_port = super(NECPluginV2, self).get_port(context, id)
new_port = super(NECPluginV2, self).update_port(context, id, port)
need_port_update_notify = self.update_security_group_on_port(
context, id, port, old_port, new_port)
changed = (old_port['admin_state_up'] is need_port_update_notify |= self.is_security_group_member_updated(
not new_port['admin_state_up']) context, old_port, new_port)
if need_port_update_notify:
self.notifier.port_update(context, new_port)
changed = (old_port['admin_state_up'] != new_port['admin_state_up'])
if changed: if changed:
if new_port['admin_state_up']: if new_port['admin_state_up']:
self.activate_port_if_ready(context, new_port) self.activate_port_if_ready(context, new_port)
else: else:
self.deactivate_port(context, old_port) self.deactivate_port(context, old_port)
# NOTE: _extend_port_dict_security_group() is called in
# update_security_group_on_port() above, so we don't need to
# call it here.
return self._extend_port_dict_binding(context, new_port) return self._extend_port_dict_binding(context, new_port)
def delete_port(self, context, id, l3_port_check=True): def delete_port(self, context, id, l3_port_check=True):
"""Delete port and packet_filters associated with the port.""" """Delete port and packet_filters associated with the port."""
LOG.debug(_("NECPluginV2.delete_port() called, id=%s ."), id) LOG.debug(_("NECPluginV2.delete_port() called, id=%s ."), id)
port = super(NECPluginV2, self).get_port(context, id) # ext_sg.SECURITYGROUPS attribute for the port is required
# since notifier.security_groups_member_updated() need the attribute.
# Thus we need to call self.get_port() instead of super().get_port()
port = self.get_port(context, id)
self.deactivate_port(context, port) self.deactivate_port(context, port)
@ -384,22 +431,27 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
# and l3-router. If so, we should prevent deletion. # and l3-router. If so, we should prevent deletion.
if l3_port_check: if l3_port_check:
self.prevent_l3_port_deletion(context, id) self.prevent_l3_port_deletion(context, id)
self.disassociate_floatingips(context, id) with context.session.begin(subtransactions=True):
super(NECPluginV2, self).delete_port(context, id) self.disassociate_floatingips(context, id)
self._delete_port_security_group_bindings(context, id)
super(NECPluginV2, self).delete_port(context, id)
self.notifier.security_groups_member_updated(
context, port.get(ext_sg.SECURITYGROUPS))
def get_port(self, context, id, fields=None): def get_port(self, context, id, fields=None):
session = context.session with context.session.begin(subtransactions=True):
with session.begin(subtransactions=True):
port = super(NECPluginV2, self).get_port(context, id, fields) port = super(NECPluginV2, self).get_port(context, id, fields)
self._extend_port_dict_security_group(context, port)
self._extend_port_dict_binding(context, port) self._extend_port_dict_binding(context, port)
return self._fields(port, fields) return self._fields(port, fields)
def get_ports(self, context, filters=None, fields=None): def get_ports(self, context, filters=None, fields=None):
session = context.session with context.session.begin(subtransactions=True):
with session.begin(subtransactions=True):
ports = super(NECPluginV2, self).get_ports(context, filters, ports = super(NECPluginV2, self).get_ports(context, filters,
fields) fields)
# TODO(amotoki) filter by security group
for port in ports: for port in ports:
self._extend_port_dict_security_group(context, port)
self._extend_port_dict_binding(context, port) self._extend_port_dict_binding(context, port)
return [self._fields(port, fields) for port in ports] return [self._fields(port, fields) for port in ports]
@ -529,8 +581,52 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
super(NECPluginV2, self).delete_packet_filter(context, id) super(NECPluginV2, self).delete_packet_filter(context, id)
class NECPluginV2RPCCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, class NECPluginV2AgentNotifierApi(proxy.RpcProxy,
l3_rpc_base.L3RpcCallbackMixin): sg_rpc.SecurityGroupAgentRpcApiMixin):
'''RPC API for NEC plugin agent'''
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic):
super(NECPluginV2AgentNotifierApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.topic_port_update = topics.get_topic_name(
topic, topics.PORT, topics.UPDATE)
def port_update(self, context, port):
self.fanout_cast(context,
self.make_msg('port_update',
port=port),
topic=self.topic_port_update)
class DhcpRpcCallback(dhcp_rpc_base.DhcpRpcCallbackMixin):
# DhcpPluginApi BASE_RPC_API_VERSION
RPC_API_VERSION = '1.0'
class L3RpcCallback(l3_rpc_base.L3RpcCallbackMixin):
# L3PluginApi BASE_RPC_API_VERSION
RPC_API_VERSION = '1.0'
class SecurityGroupServerRpcCallback(
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
RPC_API_VERSION = sg_rpc.SG_RPC_VERSION
@staticmethod
def get_port_from_device(device):
port = ndb.get_port_from_device(device)
if port:
port['device'] = device
LOG.debug(_("NECPluginV2RPCCallbacks.get_port_from_device() called, "
"device=%(device)s => %(ret)s."),
{'device': device, 'ret': port})
return port
class NECPluginV2RPCCallbacks(object):
RPC_API_VERSION = '1.0' RPC_API_VERSION = '1.0'

View File

@ -39,7 +39,7 @@ class TestNecPortsV2(test_plugin.TestPortsV2, NecPluginV2TestCase,
test_bindings.PortBindingsTestCase): test_bindings.PortBindingsTestCase):
VIF_TYPE = portbindings.VIF_TYPE_OVS VIF_TYPE = portbindings.VIF_TYPE_OVS
HAS_SECURITY_GROUP = False HAS_PORT_FILTER = True
class TestNecNetworksV2(test_plugin.TestNetworksV2, NecPluginV2TestCase): class TestNecNetworksV2(test_plugin.TestNetworksV2, NecPluginV2TestCase):

View File

@ -0,0 +1,88 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013, NEC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import mock
from quantum.api.v2 import attributes
from quantum.extensions import securitygroup as ext_sg
from quantum import manager
from quantum.plugins.nec.db import api as ndb
from quantum.tests.unit import test_extension_security_group as test_sg
from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc
PLUGIN_NAME = ('quantum.plugins.nec.nec_plugin.NECPluginV2')
AGENT_NAME = ('quantum.plugins.nec.agent.nec_quantum_agent.NECQuantumAgent')
NOTIFIER = ('quantum.plugins.nec.nec_plugin.NECPluginV2AgentNotifierApi')
class NecSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
_plugin_name = PLUGIN_NAME
def setUp(self, plugin=None):
self.addCleanup(mock.patch.stopall)
notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start()
self.notifier = mock.Mock()
notifier_cls.return_value = self.notifier
self._attribute_map_bk_ = {}
for item in attributes.RESOURCE_ATTRIBUTE_MAP:
self._attribute_map_bk_[item] = (attributes.
RESOURCE_ATTRIBUTE_MAP[item].
copy())
super(NecSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
def tearDown(self):
super(NecSecurityGroupsTestCase, self).tearDown()
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk_
class TestNecSecurityGroups(NecSecurityGroupsTestCase,
test_sg.TestSecurityGroups,
test_sg_rpc.SGNotificationTestMixin):
def test_security_group_get_port_from_device(self):
with contextlib.nested(self.network(),
self.security_group()) as (n, sg):
with self.subnet(n):
res = self._create_port(self.fmt, n['network']['id'])
port = self.deserialize(self.fmt, res)
port_id = port['port']['id']
sg_id = sg['security_group']['id']
fixed_ips = port['port']['fixed_ips']
data = {'port': {'fixed_ips': fixed_ips,
'name': port['port']['name'],
ext_sg.SECURITYGROUPS: [sg_id]}}
req = self.new_update_request('ports', data, port_id)
res = self.deserialize(self.fmt,
req.get_response(self.api))
plugin = manager.QuantumManager.get_plugin()
port_dict = plugin.callback_sg.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([sg_id],
port_dict[ext_sg.SECURITYGROUPS])
self.assertEqual([], port_dict['security_group_rules'])
self.assertEqual([fixed_ips[0]['ip_address']],
port_dict['fixed_ips'])
self._delete('ports', port_id)
class TestNecSecurityGroupsXML(TestNecSecurityGroups):
fmt = 'xml'