Support iptables-based security group in NEC plugin

blueprint nec-security-group

This commit also refactors RPC API and callbacks in the plugin and agent
to support security group RPC.

Change-Id: I09d69ca3aff43e0468bbd5df6367de767af27acc
This commit is contained in:
Akihiro MOTOKI 2013-02-08 22:18:33 +09:00
parent a5c101da4b
commit 687a949f28
8 changed files with 369 additions and 59 deletions

View File

@ -39,6 +39,10 @@ polling_interval = 2
# Change to "sudo" to skip the filtering and just run the comand directly
root_helper = sudo
[SECURITYGROUP]
# Firewall driver for realizing quantum security group function
firewall_driver = quantum.agent.linux.iptables_firewall.OVSHybridIptablesFirewallDriver
[OFC]
# Specify OpenFlow Controller Host, Port and Driver to connect.
host = 127.0.0.1

View File

@ -86,9 +86,9 @@ class SecurityGroupAgentRpcMixin(object):
"""
def init_firewall(self):
LOG.debug(_("Init firewall settings"))
self.firewall = importutils.import_object(
cfg.CONF.SECURITYGROUP.firewall_driver)
firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver
LOG.debug(_("Init firewall settings (driver=%s)"), firewall_driver)
self.firewall = importutils.import_object(firewall_driver)
def prepare_devices_filter(self, device_ids):
if not device_ids:

View File

@ -33,6 +33,7 @@ migration_for_plugins = [
'quantum.plugins.linuxbridge.lb_quantum_plugin.LinuxBridgePluginV2',
'quantum.plugins.nicira.nicira_nvp_plugin.QuantumPlugin.NvpPluginV2',
'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2',
'quantum.plugins.nec.nec_plugin.NECPluginV2',
]
from alembic import op

View File

@ -20,23 +20,99 @@
# License for the specific language governing permissions and limitations
# under the License.
# @author: Ryota MIBU
# @author: Akihiro MOTOKI
import socket
import sys
import time
import eventlet
from quantum.agent.linux import ovs_lib
from quantum.agent import rpc as agent_rpc
from quantum.agent import securitygroups_rpc as sg_rpc
from quantum.common import config as logging_config
from quantum.common import topics
from quantum import context
from quantum import context as q_context
from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import dispatcher
from quantum.openstack.common.rpc import proxy
from quantum.plugins.nec.common import config
LOG = logging.getLogger(__name__)
class NECPluginApi(agent_rpc.PluginApi):
BASE_RPC_API_VERSION = '1.0'
def update_ports(self, context, agent_id, datapath_id,
port_added, port_removed):
"""RPC to update information of ports on Quantum Server"""
LOG.info(_("Update ports: added=%(added)s, "
"removed=%(removed)s"),
{'added': port_added, 'removed': port_removed})
try:
self.call(context,
self.make_msg('update_ports',
topic=topics.AGENT,
agent_id=agent_id,
datapath_id=datapath_id,
port_added=port_added,
port_removed=port_removed))
except Exception as e:
LOG.warn(_("update_ports() failed."))
return
class NECAgentRpcCallback(object):
RPC_API_VERSION = '1.0'
def __init__(self, context, agent, sg_agent):
self.context = context
self.agent = agent
self.sg_agent = sg_agent
def port_update(self, context, **kwargs):
LOG.debug(_("port_update received: %s"), kwargs)
port = kwargs.get('port')
# Validate that port is on OVS
vif_port = self.agent.int_br.get_vif_port_by_id(port['id'])
if not vif_port:
return
if ext_sg.SECURITYGROUPS in port:
self.sg_agent.refresh_firewall()
class SecurityGroupServerRpcApi(proxy.RpcProxy,
sg_rpc.SecurityGroupServerRpcApiMixin):
def __init__(self, topic):
super(SecurityGroupServerRpcApi, self).__init__(
topic=topic, default_version=sg_rpc.SG_RPC_VERSION)
class SecurityGroupAgentRpcCallback(
sg_rpc.SecurityGroupAgentRpcCallbackMixin):
RPC_API_VERSION = sg_rpc.SG_RPC_VERSION
def __init__(self, context, sg_agent):
self.context = context
self.sg_agent = sg_agent
class SecurityGroupAgentRpc(sg_rpc.SecurityGroupAgentRpcMixin):
def __init__(self, context):
self.context = context
self.plugin_rpc = SecurityGroupServerRpcApi(topics.PLUGIN)
self.init_firewall()
class NECQuantumAgent(object):
def __init__(self, integ_br, root_helper, polling_interval):
@ -49,36 +125,46 @@ class NECQuantumAgent(object):
self.int_br = ovs_lib.OVSBridge(integ_br, root_helper)
self.polling_interval = polling_interval
self.datapath_id = "0x%s" % self.int_br.get_datapath_id()
self.setup_rpc()
def setup_rpc(self):
self.host = socket.gethostname()
self.agent_id = 'nec-q-agent.%s' % self.host
self.datapath_id = "0x%s" % self.int_br.get_datapath_id()
LOG.info(_("RPC agent_id: %s"), self.agent_id)
self.topic = topics.AGENT
self.context = q_context.get_admin_context_without_session()
self.plugin_rpc = NECPluginApi(topics.PLUGIN)
self.sg_agent = SecurityGroupAgentRpc(self.context)
# RPC network init
self.context = context.get_admin_context_without_session()
self.conn = rpc.create_connection(new=True)
def update_ports(self, port_added=[], port_removed=[]):
"""RPC to update information of ports on Quantum Server"""
LOG.info(_("Update ports: added=%(port_added)s, "
"removed=%(port_removed)s"),
locals())
try:
rpc.call(self.context,
topics.PLUGIN,
{'method': 'update_ports',
'args': {'topic': topics.AGENT,
'agent_id': self.agent_id,
'datapath_id': self.datapath_id,
'port_added': port_added,
'port_removed': port_removed}})
except Exception as e:
LOG.warn(_("update_ports() failed."))
return
# Handle updates from service
self.callback_nec = NECAgentRpcCallback(self.context,
self, self.sg_agent)
self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
self.sg_agent)
self.dispatcher = dispatcher.RpcDispatcher([self.callback_nec,
self.callback_sg])
# Define the listening consumer for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic,
consumers)
def _vif_port_to_port_info(self, vif_port):
return dict(id=vif_port.vif_id, port_no=vif_port.ofport,
mac=vif_port.vif_mac)
def _process_security_group(self, port_added, port_removed):
if port_added:
devices_added = [p['id'] for p in port_added]
self.sg_agent.prepare_devices_filter(devices_added)
if port_removed:
self.sg_agent.remove_devices_filter(port_removed)
def daemon_loop(self):
"""Main processing loop for NEC Plugin Agent."""
old_ports = []
@ -99,7 +185,10 @@ class NECQuantumAgent(object):
port_removed.append(port_id)
if port_added or port_removed:
self.update_ports(port_added, port_removed)
self.plugin_rpc.update_ports(self.context,
self.agent_id, self.datapath_id,
port_added, port_removed)
self._process_security_group(port_added, port_removed)
else:
LOG.debug(_("No port changed."))
@ -108,6 +197,8 @@ class NECQuantumAgent(object):
def main():
eventlet.monkey_patch()
config.CONF(project='quantum')
logging_config.setup_logging(config.CONF)

View File

@ -19,6 +19,10 @@ import sqlalchemy as sa
from quantum.db import api as db
from quantum.db import model_base
from quantum.db import models_v2
from quantum.db import securitygroups_db as sg_db
from quantum.extensions import securitygroup as ext_sg
from quantum import manager
from quantum.openstack.common import log as logging
# NOTE (e0ne): this import is needed for config init
from quantum.plugins.nec.common import config
@ -117,3 +121,29 @@ def del_portinfo(id):
except sa.orm.exc.NoResultFound:
LOG.warning(_("del_portinfo(): NotFound portinfo for "
"port_id: %s"), id)
def get_port_from_device(port_id):
"""Get port from database"""
LOG.debug(_("get_port_with_securitygroups() called:port_id=%s"), port_id)
session = db.get_session()
sg_binding_port = sg_db.SecurityGroupPortBinding.port_id
query = session.query(models_v2.Port,
sg_db.SecurityGroupPortBinding.security_group_id)
query = query.outerjoin(sg_db.SecurityGroupPortBinding,
models_v2.Port.id == sg_binding_port)
query = query.filter(models_v2.Port.id == port_id)
port_and_sgs = query.all()
if not port_and_sgs:
return None
port = port_and_sgs[0][0]
plugin = manager.QuantumManager.get_plugin()
port_dict = plugin._make_port_dict(port)
port_dict[ext_sg.SECURITYGROUPS] = [
sg_id for port, sg_id in port_and_sgs if sg_id]
port_dict['security_group_rules'] = []
port_dict['security_group_source_groups'] = []
port_dict['fixed_ips'] = [ip['ip_address']
for ip in port['fixed_ips']]
return port_dict

View File

@ -15,6 +15,9 @@
# under the License.
# @author: Ryota MIBU
from quantum.agent import securitygroups_rpc as sg_rpc
from quantum.common import constants as q_const
from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc
from quantum.common import topics
from quantum import context
@ -23,9 +26,12 @@ from quantum.db import l3_db
from quantum.db import l3_rpc_base
#NOTE(amotoki): quota_db cannot be removed, it is for db model
from quantum.db import quota_db
from quantum.db import securitygroups_rpc_base as sg_db_rpc
from quantum.extensions import portbindings
from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
from quantum.plugins.nec.common import config
from quantum.plugins.nec.common import exceptions as nexc
from quantum.plugins.nec.db import api as ndb
@ -51,7 +57,9 @@ class OperationalStatus:
ERROR = "ERROR"
class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
class NECPluginV2(nec_plugin_base.NECPluginV2Base,
l3_db.L3_NAT_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin):
"""NECPluginV2 controls an OpenFlow Controller.
The Quantum NECPluginV2 maps L2 logical networks to L2 virtualized networks
@ -65,7 +73,8 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
information to and from the plugin.
"""
supported_extension_aliases = ["router", "quotas", "binding"]
supported_extension_aliases = ["router", "quotas", "binding",
"security-group"]
binding_view = "extension:port_binding:view"
binding_set = "extension:port_binding:set"
@ -81,21 +90,28 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
self.setup_rpc()
def setup_rpc(self):
self.topic = topics.PLUGIN
self.conn = rpc.create_connection(new=True)
self.notifier = NECPluginV2AgentNotifierApi(topics.AGENT)
self.callback_nec = NECPluginV2RPCCallbacks(self)
self.callback_dhcp = DhcpRpcCallback()
self.callback_l3 = L3RpcCallback()
self.callback_sg = SecurityGroupServerRpcCallback()
callbacks = [self.callback_nec, self.callback_dhcp,
self.callback_l3, self.callback_sg]
self.dispatcher = q_rpc.PluginRpcDispatcher(callbacks)
self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource)
def _enforce_set_auth(self, context, resource, action):
policy.enforce(context, action, resource)
def setup_rpc(self):
self.topic = topics.PLUGIN
self.conn = rpc.create_connection(new=True)
self.callbacks = NECPluginV2RPCCallbacks(self)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
def _update_resource_status(self, context, resource, id, status):
"""Update status of specified resource."""
request = {}
@ -199,8 +215,12 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
"""Create a new network entry on DB, and create it on OFC."""
LOG.debug(_("NECPluginV2.create_network() called, "
"network=%s ."), network)
session = context.session
with session.begin(subtransactions=True):
#set up default security groups
tenant_id = self._get_tenant_id_for_create(
context, network['network'])
self._ensure_default_security_group(context, tenant_id)
with context.session.begin(subtransactions=True):
new_net = super(NECPluginV2, self).create_network(context, network)
self._process_l3_create(context, network['network'], new_net['id'])
self._extend_network_dict_l3(context, new_net)
@ -337,12 +357,25 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
def create_port(self, context, port):
"""Create a new port entry on DB, then try to activate it."""
LOG.debug(_("NECPluginV2.create_port() called, port=%s ."), port)
new_port = super(NECPluginV2, self).create_port(context, port)
self._update_resource_status(context, "port", new_port['id'],
OperationalStatus.BUILD)
with context.session.begin(subtransactions=True):
self._ensure_default_security_group_on_port(context, port)
sgids = self._get_security_groups_on_port(context, port)
port = super(NECPluginV2, self).create_port(context, port)
self._process_port_create_security_group(
context, port['id'], sgids)
self._extend_port_dict_security_group(context, port)
# Note: In order to allow dhcp packets,
# changes for dhcp ip should be notifified
if port['device_owner'] == q_const.DEVICE_OWNER_DHCP:
self.notifier.security_groups_provider_updated(context)
else:
self.notifier.security_groups_member_updated(
context, port.get(ext_sg.SECURITYGROUPS))
self.activate_port_if_ready(context, new_port)
return self._extend_port_dict_binding(context, new_port)
self._update_resource_status(context, "port", port['id'],
OperationalStatus.BUILD)
self.activate_port_if_ready(context, port)
return self._extend_port_dict_binding(context, port)
def update_port(self, context, id, port):
"""Update port, and handle packetfilters associated with the port.
@ -352,23 +385,37 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
"""
LOG.debug(_("NECPluginV2.update_port() called, "
"id=%(id)s port=%(port)s ."), locals())
old_port = super(NECPluginV2, self).get_port(context, id)
new_port = super(NECPluginV2, self).update_port(context, id, port)
need_port_update_notify = False
with context.session.begin(subtransactions=True):
old_port = super(NECPluginV2, self).get_port(context, id)
new_port = super(NECPluginV2, self).update_port(context, id, port)
need_port_update_notify = self.update_security_group_on_port(
context, id, port, old_port, new_port)
changed = (old_port['admin_state_up'] is
not new_port['admin_state_up'])
need_port_update_notify |= self.is_security_group_member_updated(
context, old_port, new_port)
if need_port_update_notify:
self.notifier.port_update(context, new_port)
changed = (old_port['admin_state_up'] != new_port['admin_state_up'])
if changed:
if new_port['admin_state_up']:
self.activate_port_if_ready(context, new_port)
else:
self.deactivate_port(context, old_port)
# NOTE: _extend_port_dict_security_group() is called in
# update_security_group_on_port() above, so we don't need to
# call it here.
return self._extend_port_dict_binding(context, new_port)
def delete_port(self, context, id, l3_port_check=True):
"""Delete port and packet_filters associated with the port."""
LOG.debug(_("NECPluginV2.delete_port() called, id=%s ."), id)
port = super(NECPluginV2, self).get_port(context, id)
# ext_sg.SECURITYGROUPS attribute for the port is required
# since notifier.security_groups_member_updated() need the attribute.
# Thus we need to call self.get_port() instead of super().get_port()
port = self.get_port(context, id)
self.deactivate_port(context, port)
@ -384,22 +431,27 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
# and l3-router. If so, we should prevent deletion.
if l3_port_check:
self.prevent_l3_port_deletion(context, id)
self.disassociate_floatingips(context, id)
super(NECPluginV2, self).delete_port(context, id)
with context.session.begin(subtransactions=True):
self.disassociate_floatingips(context, id)
self._delete_port_security_group_bindings(context, id)
super(NECPluginV2, self).delete_port(context, id)
self.notifier.security_groups_member_updated(
context, port.get(ext_sg.SECURITYGROUPS))
def get_port(self, context, id, fields=None):
session = context.session
with session.begin(subtransactions=True):
with context.session.begin(subtransactions=True):
port = super(NECPluginV2, self).get_port(context, id, fields)
self._extend_port_dict_security_group(context, port)
self._extend_port_dict_binding(context, port)
return self._fields(port, fields)
def get_ports(self, context, filters=None, fields=None):
session = context.session
with session.begin(subtransactions=True):
with context.session.begin(subtransactions=True):
ports = super(NECPluginV2, self).get_ports(context, filters,
fields)
# TODO(amotoki) filter by security group
for port in ports:
self._extend_port_dict_security_group(context, port)
self._extend_port_dict_binding(context, port)
return [self._fields(port, fields) for port in ports]
@ -529,8 +581,52 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
super(NECPluginV2, self).delete_packet_filter(context, id)
class NECPluginV2RPCCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin):
class NECPluginV2AgentNotifierApi(proxy.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin):
'''RPC API for NEC plugin agent'''
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic):
super(NECPluginV2AgentNotifierApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.topic_port_update = topics.get_topic_name(
topic, topics.PORT, topics.UPDATE)
def port_update(self, context, port):
self.fanout_cast(context,
self.make_msg('port_update',
port=port),
topic=self.topic_port_update)
class DhcpRpcCallback(dhcp_rpc_base.DhcpRpcCallbackMixin):
# DhcpPluginApi BASE_RPC_API_VERSION
RPC_API_VERSION = '1.0'
class L3RpcCallback(l3_rpc_base.L3RpcCallbackMixin):
# L3PluginApi BASE_RPC_API_VERSION
RPC_API_VERSION = '1.0'
class SecurityGroupServerRpcCallback(
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
RPC_API_VERSION = sg_rpc.SG_RPC_VERSION
@staticmethod
def get_port_from_device(device):
port = ndb.get_port_from_device(device)
if port:
port['device'] = device
LOG.debug(_("NECPluginV2RPCCallbacks.get_port_from_device() called, "
"device=%(device)s => %(ret)s."),
{'device': device, 'ret': port})
return port
class NECPluginV2RPCCallbacks(object):
RPC_API_VERSION = '1.0'

View File

@ -39,7 +39,7 @@ class TestNecPortsV2(test_plugin.TestPortsV2, NecPluginV2TestCase,
test_bindings.PortBindingsTestCase):
VIF_TYPE = portbindings.VIF_TYPE_OVS
HAS_SECURITY_GROUP = False
HAS_PORT_FILTER = True
class TestNecNetworksV2(test_plugin.TestNetworksV2, NecPluginV2TestCase):

View File

@ -0,0 +1,88 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013, NEC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import mock
from quantum.api.v2 import attributes
from quantum.extensions import securitygroup as ext_sg
from quantum import manager
from quantum.plugins.nec.db import api as ndb
from quantum.tests.unit import test_extension_security_group as test_sg
from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc
PLUGIN_NAME = ('quantum.plugins.nec.nec_plugin.NECPluginV2')
AGENT_NAME = ('quantum.plugins.nec.agent.nec_quantum_agent.NECQuantumAgent')
NOTIFIER = ('quantum.plugins.nec.nec_plugin.NECPluginV2AgentNotifierApi')
class NecSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
_plugin_name = PLUGIN_NAME
def setUp(self, plugin=None):
self.addCleanup(mock.patch.stopall)
notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start()
self.notifier = mock.Mock()
notifier_cls.return_value = self.notifier
self._attribute_map_bk_ = {}
for item in attributes.RESOURCE_ATTRIBUTE_MAP:
self._attribute_map_bk_[item] = (attributes.
RESOURCE_ATTRIBUTE_MAP[item].
copy())
super(NecSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
def tearDown(self):
super(NecSecurityGroupsTestCase, self).tearDown()
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk_
class TestNecSecurityGroups(NecSecurityGroupsTestCase,
test_sg.TestSecurityGroups,
test_sg_rpc.SGNotificationTestMixin):
def test_security_group_get_port_from_device(self):
with contextlib.nested(self.network(),
self.security_group()) as (n, sg):
with self.subnet(n):
res = self._create_port(self.fmt, n['network']['id'])
port = self.deserialize(self.fmt, res)
port_id = port['port']['id']
sg_id = sg['security_group']['id']
fixed_ips = port['port']['fixed_ips']
data = {'port': {'fixed_ips': fixed_ips,
'name': port['port']['name'],
ext_sg.SECURITYGROUPS: [sg_id]}}
req = self.new_update_request('ports', data, port_id)
res = self.deserialize(self.fmt,
req.get_response(self.api))
plugin = manager.QuantumManager.get_plugin()
port_dict = plugin.callback_sg.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([sg_id],
port_dict[ext_sg.SECURITYGROUPS])
self.assertEqual([], port_dict['security_group_rules'])
self.assertEqual([fixed_ips[0]['ip_address']],
port_dict['fixed_ips'])
self._delete('ports', port_id)
class TestNecSecurityGroupsXML(TestNecSecurityGroups):
fmt = 'xml'