One Convergence Neutron Plugin l3 ext support

Adding support for l3 extensions and security-groups.

Change-Id: I7007dba1cc8f73496a2a40099581d07ae697520a
Implements: blueprint oc-nvsd-neutron-plugin
This commit is contained in:
Hemanth Ravi 2014-02-17 15:12:54 -08:00 committed by Gerrit Code Review
parent 52fb98b7b6
commit 55c8009a2b
13 changed files with 910 additions and 20 deletions

View File

@ -19,5 +19,13 @@
# Specify 0 to retry until success (default)
# nvsd_retries = 0
[securitygroup]
# Specify firewall_driver option, if neutron security groups are disabled,
# then NoopFirewallDriver otherwise OVSHybridIptablesFirewallDriver.
# firewall_driver = neutron.agent.firewall.NoopFirewallDriver
[agent]
# root_helper = sudo /usr/local/bin/neutron-rootwrap /etc/neutron/rootwrap.conf
[database]
# connection = mysql://root:<passwd>@127.0.0.1/<neutron_db>?charset=utf8

View File

@ -16,7 +16,7 @@ parameters, use the following lines in localrc:
Q_PLUGIN=oneconvergence
disable_service n-net
disable_service q-agt
enable_service q-agt
enable_service q-dhcp
enable_service q-svc
enable_service q-l3

View File

@ -0,0 +1,174 @@
# Copyright 2014 OneConvergence, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Kedar Kulkarni, One Convergence, Inc.
"""NVSD agent code for security group events."""
import socket
import time
import eventlet
from neutron.agent.linux import ovs_lib
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import config as logging_config
from neutron.common import topics
from neutron import context as n_context
from neutron.extensions import securitygroup as ext_sg
from neutron.openstack.common import log as logging
from neutron.openstack.common.rpc import dispatcher
from neutron.openstack.common.rpc import proxy
from neutron.plugins.oneconvergence.lib import config
LOG = logging.getLogger(__name__)
class NVSDAgentRpcCallback(object):
RPC_API_VERSION = '1.0'
def __init__(self, context, agent, sg_agent):
self.context = context
self.agent = agent
self.sg_agent = sg_agent
def port_update(self, context, **kwargs):
LOG.debug(_("port_update received: %s"), kwargs)
port = kwargs.get('port')
# Validate that port is on OVS
vif_port = self.agent.int_br.get_vif_port_by_id(port['id'])
if not vif_port:
return
if ext_sg.SECURITYGROUPS in port:
self.sg_agent.refresh_firewall()
class SecurityGroupServerRpcApi(proxy.RpcProxy,
sg_rpc.SecurityGroupServerRpcApiMixin):
def __init__(self, topic):
super(SecurityGroupServerRpcApi, self).__init__(
topic=topic, default_version=sg_rpc.SG_RPC_VERSION)
class SecurityGroupAgentRpcCallback(
sg_rpc.SecurityGroupAgentRpcCallbackMixin):
RPC_API_VERSION = sg_rpc.SG_RPC_VERSION
def __init__(self, context, sg_agent):
self.context = context
self.sg_agent = sg_agent
class SecurityGroupAgentRpc(sg_rpc.SecurityGroupAgentRpcMixin):
def __init__(self, context, root_helper):
self.context = context
self.plugin_rpc = SecurityGroupServerRpcApi(topics.PLUGIN)
self.root_helper = root_helper
self.init_firewall()
class NVSDNeutronAgent(object):
# history
# 1.0 Initial version
# 1.1 Support Security Group RPC
RPC_API_VERSION = '1.1'
def __init__(self, integ_br, root_helper, polling_interval):
self.int_br = ovs_lib.OVSBridge(integ_br, root_helper)
self.polling_interval = polling_interval
self.root_helper = root_helper
self.setup_rpc()
self.ports = set()
def setup_rpc(self):
self.host = socket.gethostname()
self.agent_id = 'nvsd-q-agent.%s' % self.host
LOG.info(_("RPC agent_id: %s"), self.agent_id)
self.topic = topics.AGENT
self.context = n_context.get_admin_context_without_session()
self.sg_agent = SecurityGroupAgentRpc(self.context,
self.root_helper)
# RPC network init
# Handle updates from service
self.callback_oc = NVSDAgentRpcCallback(self.context,
self, self.sg_agent)
self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
self.sg_agent)
self.dispatcher = dispatcher.RpcDispatcher([self.callback_oc,
self.callback_sg])
# Define the listening consumer for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic,
consumers)
def _update_ports(self, registered_ports):
ports = self.int_br.get_vif_port_set()
if ports == registered_ports:
return
added = ports - registered_ports
removed = registered_ports - ports
return {'current': ports,
'added': added,
'removed': removed}
def _process_devices_filter(self, port_info):
if 'added' in port_info:
self.sg_agent.prepare_devices_filter(port_info['added'])
if 'removed' in port_info:
self.sg_agent.remove_devices_filter(port_info['removed'])
def daemon_loop(self):
"""Main processing loop for OC Plugin Agent."""
ports = set()
while True:
try:
port_info = self._update_ports(ports)
if port_info:
LOG.debug(_("Port list is updated"))
self._process_devices_filter(port_info)
ports = port_info['current']
self.ports = ports
except Exception:
LOG.exception(_("Error in agent event loop"))
LOG.debug(_("AGENT looping....."))
time.sleep(self.polling_interval)
def main():
eventlet.monkey_patch()
config.CONF(project='neutron')
logging_config.setup_logging(config.CONF)
integ_br = config.AGENT.integration_bridge
root_helper = config.AGENT.root_helper
polling_interval = config.AGENT.polling_interval
agent = NVSDNeutronAgent(integ_br, root_helper, polling_interval)
LOG.info(_("NVSD Agent initialized successfully, now running... "))
# Start everything.
agent.daemon_loop()

View File

@ -17,6 +17,8 @@
from oslo.config import cfg
from neutron.agent.common import config
NVSD_OPT = [
cfg.StrOpt('nvsd_ip',
@ -38,4 +40,18 @@ NVSD_OPT = [
help=_("Number of login retries to NVSD controller"))
]
agent_opts = [
cfg.StrOpt('integration_bridge', default='br-int',
help=_("integration bridge")),
cfg.IntOpt('polling_interval', default=2,
help=_("The number of seconds the agent will wait between "
"polling for local device changes.")),
]
cfg.CONF.register_opts(NVSD_OPT, "nvsd")
cfg.CONF.register_opts(agent_opts, "AGENT")
config.register_root_helper(cfg.CONF)
CONF = cfg.CONF
AGENT = cfg.CONF.AGENT

View File

@ -0,0 +1,45 @@
# Copyright 2014 OneConvergence, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Kedar Kulkarni, One Convergence, Inc.
from neutron.db import api as db
from neutron.db import models_v2
from neutron.db import securitygroups_db as sg_db
from neutron.extensions import securitygroup as ext_sg
from neutron import manager
def get_port_from_device(port_id):
session = db.get_session()
sg_binding_port = sg_db.SecurityGroupPortBinding.port_id
query = session.query(models_v2.Port,
sg_db.SecurityGroupPortBinding.security_group_id)
query = query.outerjoin(sg_db.SecurityGroupPortBinding,
models_v2.Port.id == sg_binding_port)
query = query.filter(models_v2.Port.id == port_id)
port_and_sgs = query.all()
if not port_and_sgs:
return None
port = port_and_sgs[0][0]
plugin = manager.NeutronManager.get_plugin()
port_dict = plugin._make_port_dict(port)
port_dict[ext_sg.SECURITYGROUPS] = [
sg_id for tport, sg_id in port_and_sgs if sg_id]
port_dict['security_group_rules'] = []
port_dict['security_group_source_groups'] = []
port_dict['fixed_ips'] = [ip['ip_address']
for ip in port['fixed_ips']]
return port_dict

View File

@ -35,6 +35,13 @@ GET_ALL_SUBNETS = "/pluginhandler/ocplugin/tenant/getallsubnets"
PORTS_URI = NETWORK_URI + "/lport/"
PORT_URI = PORTS_URI + "%s"
EXT_URI = "/pluginhandler/ocplugin/ext/tenant/%s"
FLOATING_IPS_URI = EXT_URI + "/floatingip/"
FLOATING_IP_URI = FLOATING_IPS_URI + "%s"
ROUTERS_URI = EXT_URI + "/lrouter/"
ROUTER_URI = ROUTERS_URI + "%s"
METHODS = {"POST": "create",
"PUT": "update",
"DELETE": "delete",
@ -260,3 +267,86 @@ class NVSDApi(object):
tenant_id=tenant_id)
return response.json()
def create_floatingip(self, floating_ip):
tenant_id = floating_ip['tenant_id']
uri = FLOATING_IPS_URI % tenant_id
self.send_request("POST", uri, body=json.dumps(floating_ip),
resource='floating_ip',
tenant_id=tenant_id)
LOG.debug(_("Flatingip %(id)s created under tenant %(tenant_id)s"),
{'id': floating_ip['id'], 'tenant_id': tenant_id})
def update_floatingip(self, floating_ip, floating_ip_update):
tenant_id = floating_ip['tenant_id']
floating_ip_id = floating_ip['id']
uri = FLOATING_IP_URI % (tenant_id, floating_ip_id)
self.send_request("PUT", uri,
body=json.dumps(floating_ip_update['floatingip']),
resource='floating_ip',
tenant_id=tenant_id,
resource_id=floating_ip_id)
LOG.debug(_("Flatingip %(id)s updated under tenant %(tenant_id)s"),
{'id': floating_ip_id, 'tenant_id': tenant_id})
def delete_floatingip(self, floating_ip):
tenant_id = floating_ip['tenant_id']
floating_ip_id = floating_ip['id']
uri = FLOATING_IP_URI % (tenant_id, floating_ip_id)
self.send_request("DELETE", uri, resource='floating_ip',
tenant_id=tenant_id, resource_id=floating_ip_id)
LOG.debug(_("Flatingip %(id)s deleted under tenant %(tenant_id)s"),
{'id': floating_ip_id, 'tenant_id': tenant_id})
def create_router(self, router):
tenant_id = router['tenant_id']
uri = ROUTERS_URI % tenant_id
self.send_request("POST", uri, body=json.dumps(router),
resource='router',
tenant_id=tenant_id)
LOG.debug(_("Router %(id)s created under tenant %(tenant_id)s"),
{'id': router['id'], 'tenant_id': tenant_id})
def update_router(self, router):
tenant_id = router['tenant_id']
router_id = router['id']
uri = ROUTER_URI % (tenant_id, router_id)
self.send_request("PUT", uri,
body=json.dumps(router),
resource='router', tenant_id=tenant_id,
resource_id=router_id)
LOG.debug(_("Router %(id)s updated under tenant %(tenant_id)s"),
{'id': router_id, 'tenant_id': tenant_id})
def delete_router(self, tenant_id, router_id):
uri = ROUTER_URI % (tenant_id, router_id)
self.send_request("DELETE", uri, resource='router',
tenant_id=tenant_id, resource_id=router_id)
LOG.debug(_("Router %(id)s deleted under tenant %(tenant_id)s"),
{'id': router_id, 'tenant_id': tenant_id})

View File

@ -18,6 +18,7 @@
from oslo.config import cfg
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.common import constants as q_const
@ -35,6 +36,7 @@ from neutron.db import l3_gwmode_db
from neutron.db import l3_rpc_base
from neutron.db import portbindings_base
from neutron.db import quota_db # noqa
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import portbindings
from neutron.openstack.common import excutils
from neutron.openstack.common import importutils
@ -43,16 +45,16 @@ from neutron.openstack.common import rpc
from neutron.plugins.common import constants as svc_constants
import neutron.plugins.oneconvergence.lib.config # noqa
import neutron.plugins.oneconvergence.lib.exception as nvsdexception
import neutron.plugins.oneconvergence.lib.nvsd_db as nvsd_db
from neutron.plugins.oneconvergence.lib import nvsdlib as nvsd_lib
LOG = logging.getLogger(__name__)
IPv6 = 6
class NVSDRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin):
"""Agent callback."""
class NVSDPluginRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
RPC_API_VERSION = '1.1'
@ -61,6 +63,31 @@ class NVSDRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
return q_rpc.PluginRpcDispatcher([self,
agents_db.AgentExtRpcCallback()])
@staticmethod
def get_port_from_device(device):
port = nvsd_db.get_port_from_device(device)
if port:
port['device'] = device
return port
class NVSDPluginV2AgentNotifierApi(rpc.proxy.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin):
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic):
super(NVSDPluginV2AgentNotifierApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.topic_port_update = topics.get_topic_name(topic, topics.PORT,
topics.UPDATE)
def port_update(self, context, port):
self.fanout_cast(context,
self.make_msg('port_update',
port=port,
topic=self.topic_port_update))
class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
extraroute_db.ExtraRoute_db_mixin,
@ -68,7 +95,8 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
external_net_db.External_net_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
portbindings_base.PortBindingBaseMixin):
portbindings_base.PortBindingBaseMixin,
sg_db_rpc.SecurityGroupServerRpcMixin):
"""L2 Virtual Network Plugin.
@ -80,16 +108,25 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
__native_pagination_support = True
__native_sorting_support = True
supported_extension_aliases = ['agent',
'binding',
'dhcp_agent_scheduler',
'ext-gw-mode',
'external-net',
'extraroute',
'l3_agent_scheduler',
'quotas',
'router',
]
_supported_extension_aliases = ['agent',
'binding',
'dhcp_agent_scheduler',
'ext-gw-mode',
'external-net',
'extraroute',
'l3_agent_scheduler',
'quotas',
'router',
'security-group'
]
@property
def supported_extension_aliases(self):
if not hasattr(self, '_aliases'):
aliases = self._supported_extension_aliases[:]
sg_rpc.disable_security_group_extension_if_noop_driver(aliases)
self._aliases = aliases
return self._aliases
def __init__(self):
@ -98,7 +135,10 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.oneconvergence_init()
self.base_binding_dict = {
portbindings.VIF_TYPE: portbindings.VIF_TYPE_OVS}
portbindings.VIF_TYPE: portbindings.VIF_TYPE_OVS,
portbindings.VIF_DETAILS: {
portbindings.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases}}
portbindings_base.register_port_dict_function()
@ -120,13 +160,14 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc.create_connection(new=True)
self.notifier = NVSDPluginV2AgentNotifierApi(topics.AGENT)
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
l3_rpc_agent_api.L3AgentNotify
)
self.callbacks = NVSDRpcCallbacks()
self.callbacks = NVSDPluginRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
@ -136,6 +177,10 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
def create_network(self, context, network):
tenant_id = self._get_tenant_id_for_create(
context, network['network'])
self._ensure_default_security_group(context, tenant_id)
net = self.nvsdlib.create_network(network['network'])
network['network']['id'] = net['id']
@ -224,6 +269,10 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
def create_port(self, context, port):
self._ensure_default_security_group_on_port(context, port)
sgids = self._get_security_groups_on_port(context, port)
network = {}
network_id = port['port']['network_id']
@ -238,6 +287,8 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
port['port'],
neutron_port)
self._process_port_create_security_group(context, neutron_port,
sgids)
if port['port']['device_owner'] in ('network:router_gateway',
'network:floatingip'):
# for l3 requests, tenant_id will be None/''
@ -258,12 +309,17 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
super(OneConvergencePluginV2, self).delete_port(context,
port_id)
self.notify_security_groups_member_updated(context, neutron_port)
return neutron_port
def update_port(self, context, port_id, port):
with context.session.begin(subtransactions=True):
old_port = super(OneConvergencePluginV2, self).get_port(context,
port_id)
neutron_port = super(OneConvergencePluginV2,
self).update_port(context, port_id, port)
@ -279,6 +335,12 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self._process_portbindings_create_and_update(context,
port['port'],
neutron_port)
need_port_update_notify = self.update_security_group_on_port(
context, port_id, port, old_port, neutron_port)
if need_port_update_notify:
self.notifier.port_update(context, neutron_port)
return neutron_port
def delete_port(self, context, port_id, l3_port_check=True):
@ -286,9 +348,11 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
if l3_port_check:
self.prevent_l3_port_deletion(context, port_id)
neutron_port = self._get_port(context, port_id)
with context.session.begin(subtransactions=True):
neutron_port = super(OneConvergencePluginV2,
self).get_port(context, port_id)
self._delete_port_security_group_bindings(context, port_id)
self.disassociate_floatingips(context, port_id)
@ -298,3 +362,82 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
neutron_port['tenant_id'] = network['tenant_id']
self.nvsdlib.delete_port(port_id, neutron_port)
self.notify_security_groups_member_updated(context, neutron_port)
def create_floatingip(self, context, floatingip):
neutron_floatingip = super(OneConvergencePluginV2,
self).create_floatingip(context,
floatingip)
try:
self.nvsdlib.create_floatingip(neutron_floatingip)
except nvsdexception.NVSDAPIException:
with excutils.save_and_reraise_exception():
LOG.error(_("Failed to create floatingip"))
super(OneConvergencePluginV2,
self).delete_floatingip(context,
neutron_floatingip['id'])
return neutron_floatingip
def update_floatingip(self, context, fip_id, floatingip):
with context.session.begin(subtransactions=True):
neutron_floatingip = super(OneConvergencePluginV2,
self).update_floatingip(context,
fip_id,
floatingip)
self.nvsdlib.update_floatingip(neutron_floatingip, floatingip)
return neutron_floatingip
def delete_floatingip(self, context, floating_ip_id):
with context.session.begin(subtransactions=True):
floating_ip = self._get_floatingip(context, floating_ip_id)
super(OneConvergencePluginV2,
self).delete_floatingip(context, floating_ip_id)
self.nvsdlib.delete_floatingip(floating_ip)
def create_router(self, context, router):
neutron_router = super(OneConvergencePluginV2,
self).create_router(context, router)
try:
self.nvsdlib.create_router(neutron_router)
except nvsdexception.NVSDAPIException:
with excutils.save_and_reraise_exception():
LOG.error(_("Failed to create router"))
super(OneConvergencePluginV2,
self).delete_router(context, neutron_router['id'])
return neutron_router
def update_router(self, context, router_id, router):
with context.session.begin(subtransactions=True):
neutron_router = super(OneConvergencePluginV2,
self).update_router(context, router_id,
router)
self.nvsdlib.update_router(neutron_router)
return neutron_router
def delete_router(self, context, router_id):
tenant_id = self._get_router(context, router_id)['tenant_id']
with context.session.begin(subtransactions=True):
super(OneConvergencePluginV2, self).delete_router(context,
router_id)
self.nvsdlib.delete_router(tenant_id, router_id)

View File

@ -0,0 +1,178 @@
# Copyright 2014 OneConvergence, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Kedar Kulkarni, One Convergence, Inc.
import contextlib
import time
import mock
from oslo.config import cfg
import testtools
from neutron.agent.linux import ovs_lib
from neutron.extensions import securitygroup as ext_sg
from neutron.plugins.oneconvergence.agent import nvsd_neutron_agent
from neutron.tests import base
DAEMON_LOOP_COUNT = 5
class TestOneConvergenceAgentBase(base.BaseTestCase):
def setUp(self):
super(TestOneConvergenceAgentBase, self).setUp()
self.addCleanup(cfg.CONF.reset)
self.addCleanup(mock.patch.stopall)
cfg.CONF.set_override('rpc_backend',
'neutron.openstack.common.rpc.impl_fake')
with contextlib.nested(
mock.patch('neutron.openstack.common.loopingcall.'
'FixedIntervalLoopingCall'),
) as (loopingcall):
kwargs = {'integ_br': 'integration_bridge',
'root_helper': 'dummy_wrapper',
'polling_interval': 5}
context = mock.Mock()
self.agent = nvsd_neutron_agent.NVSDNeutronAgent(**kwargs)
self.sg_agent = nvsd_neutron_agent.SecurityGroupAgentRpc(
context, 'dummy_wrapper')
self.callback_nvsd = nvsd_neutron_agent.NVSDAgentRpcCallback(
context, self.agent, self.sg_agent)
self.loopingcall = loopingcall
class TestOneConvergenceAgentCallback(TestOneConvergenceAgentBase):
def test_port_update(self):
with contextlib.nested(
mock.patch.object(ovs_lib.OVSBridge, 'get_vif_port_by_id'),
mock.patch.object(self.sg_agent, 'refresh_firewall')
) as (get_vif_port_by_id, refresh_firewall):
context = mock.Mock()
vifport = ovs_lib.VifPort('port1', '1', 'id-1', 'mac-1',
self.agent.int_br)
# The OVS port does not exist.
get_vif_port_by_id.return_value = None
port = {'id': 'update-port-1'}
self.callback_nvsd.port_update(context, port=port)
self.assertEqual(get_vif_port_by_id.call_count, 1)
self.assertFalse(refresh_firewall.call_count)
# The OVS port exists but no security group is associated.
get_vif_port_by_id.return_value = vifport
port = {'id': 'update-port-1'}
self.callback_nvsd.port_update(context, port=port)
self.assertEqual(get_vif_port_by_id.call_count, 2)
self.assertFalse(refresh_firewall.call_count)
# The OVS port exists but a security group is associated.
get_vif_port_by_id.return_value = vifport
port = {'id': 'update-port-1',
ext_sg.SECURITYGROUPS: ['default']}
self.callback_nvsd.port_update(context, port=port)
self.assertEqual(get_vif_port_by_id.call_count, 3)
self.assertEqual(refresh_firewall.call_count, 1)
get_vif_port_by_id.return_value = None
port = {'id': 'update-port-1',
ext_sg.SECURITYGROUPS: ['default']}
self.callback_nvsd.port_update(context, port=port)
self.assertEqual(get_vif_port_by_id.call_count, 4)
self.assertEqual(refresh_firewall.call_count, 1)
class TestNVSDAgent(TestOneConvergenceAgentBase):
def _setup_mock(self):
self.get_vif_ports = mock.patch.object(
ovs_lib.OVSBridge, 'get_vif_port_set',
return_value=set(['id-1', 'id-2'])).start()
self.prepare_devices_filter = mock.patch.object(
self.agent.sg_agent, 'prepare_devices_filter').start()
self.remove_devices_filter = mock.patch.object(
self.agent.sg_agent, 'remove_devices_filter').start()
def test_daemon_loop(self):
def state_check(index):
self.assertEqual(len(self.vif_ports_scenario[index]),
len(self.agent.ports))
# Fake time.sleep to stop the infinite loop in daemon_loop()
self.sleep_count = 0
def sleep_mock(*args, **kwargs):
state_check(self.sleep_count)
self.sleep_count += 1
if self.sleep_count >= DAEMON_LOOP_COUNT:
raise RuntimeError()
self.vif_ports_scenario = [set(), set(), set(), set(['id-1', 'id-2']),
set(['id-2', 'id-3'])]
# Ensure vif_ports_scenario is longer than DAEMON_LOOP_COUNT
if len(self.vif_ports_scenario) < DAEMON_LOOP_COUNT:
self.vif_ports_scenario.extend(
[] for _i in xrange(DAEMON_LOOP_COUNT -
len(self.vif_ports_scenario)))
with contextlib.nested(
mock.patch.object(time, 'sleep', side_effect=sleep_mock),
mock.patch.object(ovs_lib.OVSBridge, 'get_vif_port_set'),
mock.patch.object(self.agent.sg_agent, 'prepare_devices_filter'),
mock.patch.object(self.agent.sg_agent, 'remove_devices_filter')
) as (sleep, get_vif_port_set, prepare_devices_filter,
remove_devices_filter):
get_vif_port_set.side_effect = self.vif_ports_scenario
with testtools.ExpectedException(RuntimeError):
self.agent.daemon_loop()
self.assertEqual(sleep.call_count, DAEMON_LOOP_COUNT)
expected = [mock.call(set(['id-1', 'id-2'])),
mock.call(set(['id-3']))]
self.assertEqual(prepare_devices_filter.call_count, 2)
prepare_devices_filter.assert_has_calls(expected)
expected = [mock.call(set([])), mock.call(set(['id-1']))]
self.assertEqual(remove_devices_filter.call_count, 2)
remove_devices_filter.assert_has_calls(expected)
sleep.assert_called_with(self.agent.polling_interval)
class TestOneConvergenceAgentMain(base.BaseTestCase):
def test_main(self):
with contextlib.nested(
mock.patch.object(nvsd_neutron_agent, 'NVSDNeutronAgent'),
mock.patch('eventlet.monkey_patch'),
mock.patch.object(nvsd_neutron_agent, 'logging_config'),
mock.patch.object(nvsd_neutron_agent, 'config')
) as (agent, eventlet, logging_config, config):
config.AGENT.integration_bridge = 'br-int-dummy'
config.AGENT.root_helper = 'root-helper'
config.AGENT.polling_interval = 5
nvsd_neutron_agent.main()
self.assertTrue(eventlet.called)
self.assertTrue(logging_config.setup_logging.called)
agent.assert_has_calls([
mock.call('br-int-dummy', 'root-helper', 5),
mock.call().daemon_loop()
])

View File

@ -27,6 +27,7 @@ from neutron.manager import NeutronManager
from neutron.plugins.oneconvergence import plugin as nvsd_plugin
from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit import test_db_plugin as test_plugin
from neutron.tests.unit import test_l3_plugin
PLUGIN_NAME = 'neutron.plugins.oneconvergence.plugin.OneConvergencePluginV2'
@ -107,3 +108,27 @@ class TestOneConvergenceBasicGet(test_plugin.TestBasicGet,
class TestOneConvergenceV2HTTPResponse(test_plugin.TestV2HTTPResponse,
OneConvergencePluginV2TestCase):
pass
class TestOneConvergenceL3NatTestCase(test_l3_plugin.L3NatDBIntTestCase):
_plugin_name = PLUGIN_NAME
def setUp(self):
def mocked_oneconvergence_init(self):
def side_effect(*args, **kwargs):
return {'id': str(uuid.uuid4())}
self.nvsdlib = mock.Mock()
self.nvsdlib.create_network.side_effect = side_effect
self.addCleanup(mock.patch.stopall)
ext_mgr = test_l3_plugin.L3TestExtensionManager()
with mock.patch.object(nvsd_plugin.OneConvergencePluginV2,
'oneconvergence_init',
new=mocked_oneconvergence_init):
super(TestOneConvergenceL3NatTestCase,
self).setUp(plugin=self._plugin_name, ext_mgr=ext_mgr)
def test_floatingip_with_invalid_create_port(self):
self._test_floatingip_with_invalid_create_port(self._plugin_name)

View File

@ -30,9 +30,18 @@ GET_ALL_SUBNETS = "/pluginhandler/ocplugin/tenant/getallsubnets"
PORTS_URI = NETWORK_URI + "/lport/"
PORT_URI = PORTS_URI + "%s"
EXT_URI = "/pluginhandler/ocplugin/ext/tenant/%s"
FLOATING_IPS_URI = EXT_URI + "/floatingip/"
FLOATING_IP_URI = FLOATING_IPS_URI + "%s"
ROUTERS_URI = EXT_URI + "/lrouter/"
ROUTER_URI = ROUTERS_URI + "%s"
TEST_NET = 'test-network'
TEST_SUBNET = 'test-subnet'
TEST_PORT = 'test-port'
TEST_FIP = 'test-floatingip'
TEST_ROUTER = 'test-router'
TEST_TENANT = 'test-tenant'
@ -184,3 +193,69 @@ class TestNVSDApi(base.BaseTestCase):
resource='subnet',
tenant_id=TEST_TENANT,
resource_id=TEST_SUBNET)
def test_create_floatingip(self):
floatingip = {'id': TEST_FIP,
'tenant_id': TEST_TENANT}
uri = FLOATING_IPS_URI % TEST_TENANT
with mock.patch.object(self.nvsdlib, 'send_request') as send_request:
self.nvsdlib.create_floatingip(floatingip)
send_request.assert_called_once_with("POST", uri,
body=json.dumps(floatingip),
resource='floating_ip',
tenant_id=TEST_TENANT)
def test_update_floatingip(self):
floatingip = {'id': TEST_FIP,
'tenant_id': TEST_TENANT}
uri = FLOATING_IP_URI % (TEST_TENANT, TEST_FIP)
floatingip_update = {'floatingip': {'router_id': TEST_ROUTER}}
with mock.patch.object(self.nvsdlib, 'send_request') as send_request:
self.nvsdlib.update_floatingip(floatingip, floatingip_update)
send_request.assert_called_once_with(
"PUT", uri, body=json.dumps(floatingip_update['floatingip']),
resource='floating_ip', tenant_id=TEST_TENANT,
resource_id=TEST_FIP)
def test_delete_floatingip(self):
floatingip = {'id': TEST_FIP,
'tenant_id': TEST_TENANT}
uri = FLOATING_IP_URI % (TEST_TENANT, TEST_FIP)
with mock.patch.object(self.nvsdlib, 'send_request') as send_request:
self.nvsdlib.delete_floatingip(floatingip)
send_request.assert_called_once_with(
"DELETE", uri, resource='floating_ip', tenant_id=TEST_TENANT,
resource_id=TEST_FIP)
def test_create_router(self):
router = {'id': TEST_ROUTER, 'tenant_id': TEST_TENANT}
uri = ROUTERS_URI % TEST_TENANT
with mock.patch.object(self.nvsdlib, 'send_request') as send_request:
self.nvsdlib.create_router(router)
send_request.assert_called_once_with(
"POST", uri, body=json.dumps(router), resource='router',
tenant_id=TEST_TENANT)
def test_update_router(self):
router = {'id': TEST_ROUTER, 'tenant_id': TEST_TENANT}
uri = ROUTER_URI % (TEST_TENANT, TEST_ROUTER)
with mock.patch.object(self.nvsdlib, 'send_request') as send_request:
self.nvsdlib.update_router(router)
send_request.assert_called_once_with(
"PUT", uri, body=json.dumps(router),
resource='router', tenant_id=TEST_TENANT,
resource_id=TEST_ROUTER)
def test_delete_router(self):
uri = ROUTER_URI % (TEST_TENANT, TEST_ROUTER)
with mock.patch.object(self.nvsdlib, 'send_request') as send_request:
self.nvsdlib.delete_router(TEST_TENANT, TEST_ROUTER)
send_request.assert_called_once_with(
"DELETE", uri, resource='router',
tenant_id=TEST_TENANT, resource_id=TEST_ROUTER)

View File

@ -0,0 +1,134 @@
# Copyright 2014 OneConvergence, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Kedar Kulkarni, One Convergence, Inc.
import uuid
import mock
from neutron.api.v2 import attributes
from neutron.extensions import securitygroup as ext_sg
from neutron import manager
from neutron.plugins.oneconvergence import plugin as nvsd_plugin
from neutron.tests.unit import test_extension_security_group as test_sg
from neutron.tests.unit import test_security_groups_rpc as test_sg_rpc
PLUGIN_NAME = ('neutron.plugins.oneconvergence.'
'plugin.OneConvergencePluginV2')
AGENTNOTIFIER = ('neutron.plugins.oneconvergence.'
'plugin.NVSDPluginV2AgentNotifierApi')
DUMMY_NVSD_LIB = ('neutron.tests.unit.oneconvergence.dummynvsdlib.NVSDApi')
class OneConvergenceSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
_plugin_name = PLUGIN_NAME
def setUp(self):
def mocked_oneconvergence_init(self):
def side_effect(*args, **kwargs):
return {'id': str(uuid.uuid4())}
self.nvsdlib = mock.Mock()
self.nvsdlib.create_network.side_effect = side_effect
self.addCleanup(mock.patch.stopall)
test_sg_rpc.set_firewall_driver(test_sg_rpc.FIREWALL_HYBRID_DRIVER)
self.addCleanup(mock.patch.stopall)
notifier_cls = mock.patch(AGENTNOTIFIER).start()
self.notifier = mock.Mock()
notifier_cls.return_value = self.notifier
self._attribute_map_bk_ = {}
for item in attributes.RESOURCE_ATTRIBUTE_MAP:
self._attribute_map_bk_[item] = (attributes.
RESOURCE_ATTRIBUTE_MAP[item].
copy())
with mock.patch.object(nvsd_plugin.OneConvergencePluginV2,
'oneconvergence_init',
new=mocked_oneconvergence_init):
super(OneConvergenceSecurityGroupsTestCase,
self).setUp(PLUGIN_NAME)
def tearDown(self):
super(OneConvergenceSecurityGroupsTestCase, self).tearDown()
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk_
class TestOneConvergenceSGServerRpcCallBack(
OneConvergenceSecurityGroupsTestCase,
test_sg_rpc.SGServerRpcCallBackMixinTestCase):
def test_security_group_rules_for_devices_ipv6_egress(self):
self.skipTest("NVSD Plugin does not support IPV6.")
def test_security_group_rules_for_devices_ipv6_ingress(self):
self.skipTest("NVSD Plugin does not support IPV6.")
def test_security_group_rules_for_devices_ipv6_source_group(self):
self.skipTest("NVSD Plugin does not support IPV6.")
class TestOneConvergenceSGServerRpcCallBackXML(
OneConvergenceSecurityGroupsTestCase,
test_sg_rpc.SGServerRpcCallBackMixinTestCaseXML):
def test_security_group_rules_for_devices_ipv6_egress(self):
self.skipTest("NVSD Plugin does not support IPV6.")
def test_security_group_rules_for_devices_ipv6_ingress(self):
self.skipTest("NVSD Plugin does not support IPV6.")
def test_security_group_rules_for_devices_ipv6_source_group(self):
self.skipTest("NVSD Plugin does not support IPV6.")
class TestOneConvergenceSecurityGroups(OneConvergenceSecurityGroupsTestCase,
test_sg.TestSecurityGroups,
test_sg_rpc.SGNotificationTestMixin):
def test_security_group_get_port_from_device(self):
with self.network() as n:
with self.subnet(n):
with self.security_group() as sg:
security_group_id = sg['security_group']['id']
res = self._create_port(self.fmt, n['network']['id'])
port = self.deserialize(self.fmt, res)
fixed_ips = port['port']['fixed_ips']
data = {'port': {'fixed_ips': fixed_ips,
'name': port['port']['name'],
ext_sg.SECURITYGROUPS:
[security_group_id]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt,
req.get_response(self.api))
port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.callbacks.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS])
self.assertEqual([], port_dict['security_group_rules'])
self.assertEqual([fixed_ips[0]['ip_address']],
port_dict['fixed_ips'])
self._delete('ports', port_id)
def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
self.assertIsNone(port_dict)
class TestOneConvergenceSecurityGroupsXML(TestOneConvergenceSecurityGroups):
fmt = 'xml'

View File

@ -103,6 +103,7 @@ console_scripts =
neutron-netns-cleanup = neutron.agent.netns_cleanup_util:main
neutron-ns-metadata-proxy = neutron.agent.metadata.namespace_proxy:main
neutron-nsx-manage = neutron.plugins.vmware.shell:main
neutron-nvsd-agent = neutron.plugins.oneconvergence.agent.nvsd_neutron_agent:main
neutron-openvswitch-agent = neutron.plugins.openvswitch.agent.ovs_neutron_agent:main
neutron-ovs-cleanup = neutron.agent.ovs_cleanup_util:main
neutron-restproxy-agent = neutron.plugins.bigswitch.agent.restproxy_agent:main
@ -125,6 +126,7 @@ console_scripts =
quantum-nec-agent = neutron.plugins.nec.agent.nec_neutron_agent:main
quantum-netns-cleanup = neutron.agent.netns_cleanup_util:main
quantum-ns-metadata-proxy = neutron.agent.metadata.namespace_proxy:main
quantum-nvsd-agent = neutron.plugins.oneconvergence.agent.nvsd_neutron_agent:main
quantum-openvswitch-agent = neutron.plugins.openvswitch.agent.ovs_neutron_agent:main
quantum-ovs-cleanup = neutron.agent.ovs_cleanup_util:main
quantum-ryu-agent = neutron.plugins.ryu.agent.ryu_neutron_agent:main