Merge "Add simple_dvs_plugin"

This commit is contained in:
Jenkins 2015-03-05 16:51:05 +00:00 committed by Gerrit Code Review
commit 313542ccd6
8 changed files with 834 additions and 0 deletions

View File

@ -17,3 +17,4 @@ oslo.i18n>=1.0.0 # Apache-2.0
oslo.log>=0.1.0 # Apache-2.0 oslo.log>=0.1.0 # Apache-2.0
oslo.serialization>=1.0.0 # Apache-2.0 oslo.serialization>=1.0.0 # Apache-2.0
oslo.utils>=1.1.0 # Apache-2.0 oslo.utils>=1.1.0 # Apache-2.0
oslo.vmware>=0.9.0 # Apache-2.0

View File

@ -0,0 +1,135 @@
# Copyright 2014 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.utils import excutils
from oslo.vmware import vim_util
from neutron.common import exceptions
from neutron.openstack.common import log as logging
from vmware_nsx.neutron.plugins.vmware.common import exceptions as nsx_exc
from vmware_nsx.neutron.plugins.vmware.dvs import dvs_utils
LOG = logging.getLogger(__name__)
class DvsManager(object):
"""Management class for dvs related tasks."""
def __init__(self):
"""Initializer.
A global session with the VC will be established. In addition to this
the moref of the configured DVS will be learnt. This will be used in
the operations supported by the manager.
NOTE: the DVS port group name will be the Neutron network UUID.
"""
self._session = dvs_utils.dvs_create_session()
# In the future we may decide to support more than one DVS
self._dvs_moref = self._get_dvs_moref(self._session,
dvs_utils.dvs_name_get())
def _get_dvs_moref(self, session, dvs_name):
"""Get the moref of the configured DVS."""
results = session.invoke_api(vim_util,
'get_objects',
session.vim,
'DistributedVirtualSwitch',
100)
while results:
for dvs in results.objects:
for prop in dvs.propSet:
if dvs_name == prop.val:
vim_util.cancel_retrieval(session.vim, results)
return dvs.obj
results = vim_util.continue_retrieval(session.vim, results)
raise nsx_exc.DvsNotFound(dvs=dvs_name)
def _get_port_group_spec(self, net_id, vlan_tag):
"""Gets the port groups spec for net_id and vlan_tag."""
client_factory = self._session.vim.client.factory
pg_spec = client_factory.create('ns0:DVPortgroupConfigSpec')
pg_spec.name = net_id
pg_spec.type = 'ephemeral'
config = client_factory.create('ns0:VMwareDVSPortSetting')
if vlan_tag:
# Create the spec for the vlan tag
spec_ns = 'ns0:VmwareDistributedVirtualSwitchVlanIdSpec'
vl_spec = client_factory.create(spec_ns)
vl_spec.vlanId = vlan_tag
vl_spec.inherited = '0'
config.vlan = vl_spec
pg_spec.defaultPortConfig = config
return pg_spec
def add_port_group(self, net_id, vlan_tag=None):
"""Add a new port group to the configured DVS."""
pg_spec = self._get_port_group_spec(net_id, vlan_tag)
task = self._session.invoke_api(self._session.vim,
'CreateDVPortgroup_Task',
self._dvs_moref,
spec=pg_spec)
try:
# NOTE(garyk): cache the returned moref
self._session.wait_for_task(task)
except Exception:
# NOTE(garyk): handle more specific exceptions
with excutils.save_and_reraise_exception():
LOG.exception(_('Failed to create port group for '
'%(net_id)s with tag %(tag)s.'),
{'net_id': net_id, 'tag': vlan_tag})
LOG.info("%(net_id)s with tag %(vlan_tag)s created on %(dvs)s.",
{'net_id': net_id,
'vlan_tag': vlan_tag,
'dvs': dvs_utils.dvs_name_get()})
def _net_id_to_moref(self, net_id):
"""Gets the moref for the specific neutron network."""
# NOTE(garyk): return this from a cache if not found then invoke
# code below.
port_groups = self._session.invoke_api(vim_util,
'get_object_properties',
self._session.vim,
self._dvs_moref,
['portgroup'])
if len(port_groups) and hasattr(port_groups[0], 'propSet'):
for prop in port_groups[0].propSet:
for val in prop.val[0]:
props = self._session.invoke_api(vim_util,
'get_object_properties',
self._session.vim,
val, ['name'])
if len(props) and hasattr(props[0], 'propSet'):
for prop in props[0].propSet:
if net_id == prop.val:
# NOTE(garyk): update cache
return val
raise exceptions.NetworkNotFound(net_id=net_id)
def delete_port_group(self, net_id):
"""Delete a specific port group."""
moref = self._net_id_to_moref(net_id)
task = self._session.invoke_api(self._session.vim,
'Destroy_Task',
moref)
try:
self._session.wait_for_task(task)
except Exception:
# NOTE(garyk): handle more specific exceptions
with excutils.save_and_reraise_exception():
LOG.exception(_('Failed to delete port group for %s.'), net_id)
LOG.info("%(net_id)s delete from %(dvs)s.",
{'net_id': net_id,
'dvs': dvs_utils.dvs_name_get()})

View File

@ -0,0 +1,60 @@
# Copyright 2014 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from oslo.vmware import api
dvs_opts = [
cfg.StrOpt('host_ip',
help='Hostname or IP address for connection to VMware VC '
'host.'),
cfg.IntOpt('host_port', default=443,
help='Port for connection to VMware VC host.'),
cfg.StrOpt('host_username',
help='Username for connection to VMware VC host.'),
cfg.StrOpt('host_password',
help='Password for connection to VMware VC host.',
secret=True),
cfg.FloatOpt('task_poll_interval',
default=0.5,
help='The interval used for polling of remote tasks.'),
cfg.IntOpt('api_retry_count',
default=10,
help='The number of times we retry on failures, e.g., '
'socket error, etc.'),
cfg.StrOpt('dvs_name',
help='The name of the preconfigured DVS.'),
]
CONF = cfg.CONF
CONF.register_opts(dvs_opts, 'dvs')
def dvs_is_enabled():
"""Returns the configured DVS status."""
return bool(CONF.dvs.host_ip and CONF.dvs.host_username and
CONF.dvs.host_password and CONF.dvs.dvs_name)
def dvs_create_session():
return api.VMwareAPISession(CONF.dvs.host_ip,
CONF.dvs.host_username,
CONF.dvs.host_password,
CONF.dvs.api_retry_count,
CONF.dvs.task_poll_interval,
port=CONF.dvs.host_port)
def dvs_name_get():
return CONF.dvs.dvs_name

View File

@ -16,7 +16,9 @@
# #
from vmware_nsx.neutron.plugins.vmware.plugins import base from vmware_nsx.neutron.plugins.vmware.plugins import base
from vmware_nsx.neutron.plugins.vmware.plugins import dvs
from vmware_nsx.neutron.plugins.vmware.plugins import nsx_v from vmware_nsx.neutron.plugins.vmware.plugins import nsx_v
NsxPlugin = base.NsxPluginV2 NsxPlugin = base.NsxPluginV2
NsxVPlugin = nsx_v.NsxVPluginV2 NsxVPlugin = nsx_v.NsxVPluginV2
NsxDvsPlugin = dvs.NsxDvsV2

View File

@ -0,0 +1,383 @@
# Copyright 2012 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from oslo.utils import excutils
from oslo_log import log as logging
from neutron.api import extensions as neutron_extensions
from neutron.api.v2 import attributes as attr
from neutron.common import exceptions as n_exc
from neutron.db import agentschedulers_db
from neutron.db import allowedaddresspairs_db as addr_pair_db
from neutron.db import db_base_plugin_v2
from neutron.db import external_net_db
from neutron.db import portbindings_db
from neutron.db import portsecurity_db
from neutron.extensions import allowedaddresspairs as addr_pair
from neutron.extensions import multiprovidernet as mpnet
from neutron.extensions import portbindings as pbin
from neutron.extensions import portsecurity as psec
from neutron.extensions import providernet as pnet
from vmware_nsx.neutron.plugins import vmware
from vmware_nsx.neutron.plugins.vmware.common import config # noqa
from vmware_nsx.neutron.plugins.vmware.common import exceptions as nsx_exc
from vmware_nsx.neutron.plugins.vmware.common import utils as c_utils
from vmware_nsx.neutron.plugins.vmware.dbexts import db as nsx_db
from vmware_nsx.neutron.plugins.vmware import dhcpmeta_modes
from vmware_nsx.neutron.plugins.vmware.dvs import dvs
from vmware_nsx.neutron.plugins.vmware.dvs import dvs_utils
LOG = logging.getLogger(__name__)
class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
db_base_plugin_v2.NeutronDbPluginV2,
dhcpmeta_modes.DhcpMetadataAccess,
external_net_db.External_net_db_mixin,
portbindings_db.PortBindingMixin,
portsecurity_db.PortSecurityDbMixin):
supported_extension_aliases = ["allowed-address-pairs",
"binding",
"mac-learning",
"multi-provider",
"port-security",
"provider",
"quotas",
"external-net"]
__native_bulk_support = True
__native_pagination_support = True
__native_sorting_support = True
def __init__(self):
super(NsxDvsV2, self).__init__()
config.validate_config_options()
LOG.debug('Driver support: DVS: %s' % dvs_utils.dvs_is_enabled())
neutron_extensions.append_api_extensions_path([vmware.NSX_EXT_PATH])
self._dvs = dvs.DvsManager()
# Common driver code
self.base_binding_dict = {
pbin.VIF_TYPE: pbin.VIF_TYPE_DVS,
pbin.VIF_DETAILS: {
# TODO(rkukura): Replace with new VIF security details
pbin.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases}}
self.setup_dhcpmeta_access()
def _extend_network_dict_provider(self, context, network,
multiprovider=None, bindings=None):
if not bindings:
bindings = nsx_db.get_network_bindings(context.session,
network['id'])
if not multiprovider:
multiprovider = nsx_db.is_multiprovider_network(context.session,
network['id'])
# With NSX plugin 'normal' overlay networks will have no binding
# TODO(salvatore-orlando) make sure users can specify a distinct
# phy_uuid as 'provider network' for STT net type
if bindings:
if not multiprovider:
# network came in through provider networks api
network[pnet.NETWORK_TYPE] = bindings[0].binding_type
network[pnet.PHYSICAL_NETWORK] = bindings[0].phy_uuid
network[pnet.SEGMENTATION_ID] = bindings[0].vlan_id
else:
# network come in though multiprovider networks api
network[mpnet.SEGMENTS] = [
{pnet.NETWORK_TYPE: binding.binding_type,
pnet.PHYSICAL_NETWORK: binding.phy_uuid,
pnet.SEGMENTATION_ID: binding.vlan_id}
for binding in bindings]
def _dvs_get_id(self, net_data):
if net_data['name'] == '':
return net_data['id']
else:
# Maximum name length is 80 characters. 'id' length is 36
# maximum prefix for name is 43
return '%s-%s' % (net_data['name'][:43], net_data['id'])
def _dvs_create_network(self, context, network):
net_data = network['network']
if net_data['admin_state_up'] is False:
LOG.warning(_("Network with admin_state_up=False are not yet "
"supported by this plugin. Ignoring setting for "
"network %s"), net_data.get('name', '<unknown>'))
net_data['id'] = str(uuid.uuid4())
vlan_tag = 0
if net_data.get(pnet.NETWORK_TYPE) == c_utils.NetworkTypes.VLAN:
vlan_tag = net_data.get(pnet.SEGMENTATION_ID, 0)
dvs_id = self._dvs_get_id(net_data)
self._dvs.add_port_group(dvs_id, vlan_tag)
try:
with context.session.begin(subtransactions=True):
new_net = super(NsxDvsV2, self).create_network(context,
network)
# Process port security extension
self._process_network_port_security_create(
context, net_data, new_net)
nsx_db.add_network_binding(
context.session, new_net['id'],
net_data.get(pnet.NETWORK_TYPE),
net_data.get(pnet.PHYSICAL_NETWORK),
vlan_tag)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_('Failed to create network'))
self._dvs.delete_port_group(dvs_id)
new_net[pnet.NETWORK_TYPE] = net_data.get(pnet.NETWORK_TYPE)
new_net[pnet.PHYSICAL_NETWORK] = 'dvs'
new_net[pnet.SEGMENTATION_ID] = vlan_tag
self.handle_network_dhcp_access(context, new_net,
action='create_network')
return new_net
def create_network(self, context, network):
return self._dvs_create_network(context, network)
def _dvs_delete_network(self, context, id):
network = self._get_network(context, id)
dvs_id = self._dvs_get_id(network)
super(NsxDvsV2, self).delete_network(context, id)
try:
self._dvs.delete_port_group(dvs_id)
except Exception:
LOG.exception(_('Unable to delete DVS port group %s'), id)
self.handle_network_dhcp_access(context, id, action='delete_network')
def delete_network(self, context, id):
self._dvs_delete_network(context, id)
def _dvs_get_network(self, context, id, fields=None):
with context.session.begin(subtransactions=True):
# goto to the plugin DB and fetch the network
network = self._get_network(context, id)
# Don't do field selection here otherwise we won't be able
# to add provider networks fields
net_result = self._make_network_dict(network)
self._extend_network_dict_provider(context, net_result)
return self._fields(net_result, fields)
def get_network(self, context, id, fields=None):
return self._dvs_get_network(context, id, fields=None)
def get_networks(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
filters = filters or {}
with context.session.begin(subtransactions=True):
networks = (
super(NsxDvsV2, self).get_networks(
context, filters, fields, sorts,
limit, marker, page_reverse))
for net in networks:
self._extend_network_dict_provider(context, net)
return [self._fields(network, fields) for network in networks]
def update_network(self, context, id, network):
raise nsx_exc.NsxPluginException(
err_msg=_("Unable to update DVS network"))
def create_port(self, context, port):
# If PORTSECURITY is not the default value ATTR_NOT_SPECIFIED
# then we pass the port to the policy engine. The reason why we don't
# pass the value to the policy engine when the port is
# ATTR_NOT_SPECIFIED is for the case where a port is created on a
# shared network that is not owned by the tenant.
port_data = port['port']
with context.session.begin(subtransactions=True):
# First we allocate port in neutron database
neutron_db = super(NsxDvsV2, self).create_port(context, port)
# Update fields obtained from neutron db (eg: MAC address)
port["port"].update(neutron_db)
self.handle_port_metadata_access(context, neutron_db)
# port security extension checks
(port_security, has_ip) = self._determine_port_security_and_has_ip(
context, port_data)
port_data[psec.PORTSECURITY] = port_security
self._process_port_port_security_create(
context, port_data, neutron_db)
# allowed address pair checks
if attr.is_attr_set(port_data.get(addr_pair.ADDRESS_PAIRS)):
if not port_security:
raise addr_pair.AddressPairAndPortSecurityRequired()
else:
self._process_create_allowed_address_pairs(
context, neutron_db,
port_data[addr_pair.ADDRESS_PAIRS])
else:
# remove ATTR_NOT_SPECIFIED
port_data[addr_pair.ADDRESS_PAIRS] = []
LOG.debug(_("create_port completed on NSX for tenant "
"%(tenant_id)s: (%(id)s)"), port_data)
self._process_portbindings_create_and_update(context,
port['port'],
port_data)
# DB Operation is complete, perform DVS operation
port_data = port['port']
self.handle_port_dhcp_access(context, port_data, action='create_port')
return port_data
def update_port(self, context, id, port):
changed_fixed_ips = 'fixed_ips' in port['port']
delete_addr_pairs = self._check_update_deletes_allowed_address_pairs(
port)
has_addr_pairs = self._check_update_has_allowed_address_pairs(port)
with context.session.begin(subtransactions=True):
ret_port = super(NsxDvsV2, self).update_port(
context, id, port)
# Save current mac learning state to check whether it's
# being updated or not
# copy values over - except fixed_ips as
# they've already been processed
port['port'].pop('fixed_ips', None)
ret_port.update(port['port'])
# populate port_security setting
if psec.PORTSECURITY not in port['port']:
ret_port[psec.PORTSECURITY] = self._get_port_security_binding(
context, id)
# validate port security and allowed address pairs
if not ret_port[psec.PORTSECURITY]:
# has address pairs in request
if has_addr_pairs:
raise addr_pair.AddressPairAndPortSecurityRequired()
elif not delete_addr_pairs:
# check if address pairs are in db
ret_port[addr_pair.ADDRESS_PAIRS] = (
self.get_allowed_address_pairs(context, id))
if ret_port[addr_pair.ADDRESS_PAIRS]:
raise addr_pair.AddressPairAndPortSecurityRequired()
if delete_addr_pairs or has_addr_pairs:
# delete address pairs and read them in
self._delete_allowed_address_pairs(context, id)
self._process_create_allowed_address_pairs(
context, ret_port, ret_port[addr_pair.ADDRESS_PAIRS])
elif changed_fixed_ips:
self._check_fixed_ips_and_address_pairs_no_overlap(context,
ret_port)
if psec.PORTSECURITY in port['port']:
self._process_port_port_security_update(
context, port['port'], ret_port)
LOG.debug(_("Updating port: %s"), port)
self._process_portbindings_create_and_update(context,
port['port'],
ret_port)
return ret_port
def delete_port(self, context, id, l3_port_check=True,
nw_gw_port_check=True):
"""Deletes a port on a specified Virtual Network.
If the port contains a remote interface attachment, the remote
interface is first un-plugged and then the port is deleted.
:returns: None
:raises: exception.PortInUse
:raises: exception.PortNotFound
:raises: exception.NetworkNotFound
"""
neutron_db_port = self.get_port(context, id)
with context.session.begin(subtransactions=True):
# metadata_dhcp_host_route
self.handle_port_metadata_access(
context, neutron_db_port, is_delete=True)
super(NsxDvsV2, self).delete_port(context, id)
self.handle_port_dhcp_access(
context, neutron_db_port, action='delete_port')
def get_router(self, context, id, fields=None):
# DVS backend cannot support logical router.
msg = (_("Unable to get info for router %s on DVS backend") % id)
raise n_exc.BadRequest(resource="router", msg=msg)
def create_router(self, context, router):
# DVS backend cannot support logical router
msg = (_("Unable to create router %s on DVS backend") %
router['router']['name'])
raise n_exc.BadRequest(resource="router", msg=msg)
def update_router(self, context, router_id, router):
# DVS backend cannot support logical router
msg = (_("Unable to update router %s on DVS backend") % router_id)
raise n_exc.BadRequest(resource="router", msg=msg)
def delete_router(self, context, router_id):
# DVS backend cannot support logical router.
msg = (_("Unable to delete router %s on DVS backend") % router_id)
raise n_exc.BadRequest(resource="router", msg=msg)
def add_router_interface(self, context, router_id, interface_info):
# DVS backend cannot support logical router
msg = _("Unable to add router interface to network on DVS backend")
raise n_exc.BadRequest(resource="router", msg=msg)
def remove_router_interface(self, context, router_id, interface_info):
# DVS backend cannot support logical router
msg = _("Unable to remove router interface to network on DVS backend")
raise n_exc.BadRequest(resource="router", msg=msg)
def delete_floatingip(self, context, id):
# DVS backend cannot support floating ips
msg = _("Cannot bind a floating ip to ports on DVS backend")
raise n_exc.BadRequest(resource="port", msg=msg)
def disassociate_floatingips(self, context, port_id):
# DVS backend cannot support floating ips
msg = _("Cannot bind a floating ip to ports on DVS backend")
raise n_exc.BadRequest(resource="port", msg=msg)
def create_security_group(self, context, security_group, default_sg=False):
raise NotImplementedError(
_("Create security group not supported for DVS"))
def update_security_group(self, context, secgroup_id, security_group):
raise NotImplementedError(
_("Update security group not supported for DVS"))
def delete_security_group(self, context, security_group_id):
raise NotImplementedError(
_("Delete security group not supported for DVS"))
def create_security_group_rule(self, context, security_group_rule):
raise NotImplementedError(
_("Create security group rule not supported for DVS"))
def create_security_group_rule_bulk(self, context, security_group_rule):
raise NotImplementedError(
_("Create security group rule not supported for DVS"))
def delete_security_group_rule(self, context, sgrid):
raise NotImplementedError(
_("Delete security group rule not supported for DVS"))

View File

@ -0,0 +1,196 @@
# Copyright (c) 2014 VMware.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import mock
from oslo.config import cfg
from neutron.common import exceptions as exp
from neutron import context
from neutron.extensions import portbindings
from neutron import manager
from neutron.openstack.common import uuidutils
from neutron.tests import base
import neutron.tests.unit.test_db_plugin as test_plugin
from vmware_nsx.neutron.plugins.vmware.common import exceptions as nsx_exc
from vmware_nsx.neutron.plugins.vmware.dbexts import db as nsx_db
from vmware_nsx.neutron.plugins.vmware.dvs import dvs
from vmware_nsx.neutron.plugins.vmware.dvs import dvs_utils
PLUGIN_NAME = 'vmware_nsx.neutron.plugins.vmware.plugin.NsxDvsPlugin'
class fake_session(object):
def __init__(self, *ret):
self._vim = mock.Mock()
def invoke_api(self, *args, **kwargs):
pass
def wait_for_task(self, task):
pass
def vim(self):
return self._vim
class DvsTestCase(base.BaseTestCase):
@mock.patch.object(dvs_utils, 'dvs_create_session',
return_value=fake_session())
@mock.patch.object(dvs.DvsManager, '_get_dvs_moref',
return_value='dvs-moref')
def setUp(self, mock_moref, mock_session):
super(DvsTestCase, self).setUp()
cfg.CONF.set_override('dvs_name', 'fake_dvs', group='dvs')
self._dvs = dvs.DvsManager()
self.assertEqual('dvs-moref', self._dvs._dvs_moref)
mock_moref.assert_called_once_with(mock_session.return_value,
'fake_dvs')
@mock.patch.object(dvs_utils, 'dvs_create_session',
return_value=fake_session())
def test_dvs_not_found(self, mock_session):
self.assertRaises(nsx_exc.DvsNotFound,
dvs.DvsManager)
@mock.patch.object(dvs.DvsManager, '_get_port_group_spec',
return_value='fake-spec')
def test_add_port_group(self, fake_get_spec):
self._dvs.add_port_group('fake-uuid', 7)
fake_get_spec.assert_called_once_with('fake-uuid', 7)
@mock.patch.object(dvs.DvsManager, '_get_port_group_spec',
return_value='fake-spec')
def test_add_port_group_with_exception(self, fake_get_spec):
with (
mock.patch.object(self._dvs._session, 'wait_for_task',
side_effect=exp.NeutronException())
):
self.assertRaises(exp.NeutronException,
self._dvs.add_port_group,
'fake-uuid', 7)
fake_get_spec.assert_called_once_with('fake-uuid', 7)
@mock.patch.object(dvs.DvsManager, '_net_id_to_moref',
return_value='fake-moref')
def test_delete_port_group(self, fake_get_moref):
self._dvs.delete_port_group('fake-uuid')
fake_get_moref.assert_called_once_with('fake-uuid')
@mock.patch.object(dvs.DvsManager, '_net_id_to_moref',
return_value='fake-moref')
def test_delete_port_group_with_exception(self, fake_get_moref):
with (
mock.patch.object(self._dvs._session, 'wait_for_task',
side_effect=exp.NeutronException())
):
self.assertRaises(exp.NeutronException,
self._dvs.delete_port_group,
'fake-uuid')
fake_get_moref.assert_called_once_with('fake-uuid')
class NeutronSimpleDvsTest(test_plugin.NeutronDbPluginV2TestCase):
@mock.patch.object(dvs_utils, 'dvs_create_session',
return_value=fake_session())
@mock.patch.object(dvs.DvsManager, '_get_dvs_moref',
return_value='dvs-moref')
def setUp(self, mock_moref, mock_session,
plugin=PLUGIN_NAME,
ext_mgr=None,
service_plugins=None):
# Ensure that DVS is enabled
cfg.CONF.set_override('host_ip', 'fake_ip', group='dvs')
cfg.CONF.set_override('host_username', 'fake_user', group='dvs')
cfg.CONF.set_override('host_password', 'fake_password', group='dvs')
cfg.CONF.set_override('dvs_name', 'fake_dvs', group='dvs')
super(NeutronSimpleDvsTest, self).setUp(plugin=PLUGIN_NAME)
self._plugin = manager.NeutronManager.get_plugin()
def _create_and_delete_dvs_network(self, network_type='flat', vlan_tag=0):
params = {'provider:network_type': network_type,
'provider:physical_network': 'dvs',
'provider:segmentation_id': vlan_tag}
params['arg_list'] = tuple(params.keys())
with contextlib.nested(
mock.patch.object(self._plugin._dvs, 'add_port_group'),
mock.patch.object(self._plugin._dvs, 'delete_port_group')
) as (mock_add, mock_delete):
with self.network(**params) as network:
ctx = context.get_admin_context()
id = network['network']['id']
dvs_id = '%s-%s' % (network['network']['name'], id)
binding = nsx_db.get_network_bindings(ctx.session, id)
self.assertIsNotNone(binding)
self.assertEqual('dvs', binding[0].phy_uuid)
if network_type == 'flat':
self.assertEqual('flat', binding[0].binding_type)
self.assertEqual(0, binding[0].vlan_id)
elif network_type == 'vlan':
self.assertEqual('vlan', binding[0].binding_type)
self.assertEqual(vlan_tag, binding[0].vlan_id)
else:
self.fail()
mock_add.assert_called_once_with(dvs_id, vlan_tag)
def test_create_and_delete_dvs_network_tag(self):
self._create_and_delete_dvs_network(network_type='vlan', vlan_tag=7)
def test_create_and_delete_dvs_network_flat(self):
self._create_and_delete_dvs_network()
def test_create_and_delete_dvs_port(self):
params = {'provider:network_type': 'vlan',
'provider:physical_network': 'dvs',
'provider:segmentation_id': 7}
params['arg_list'] = tuple(params.keys())
with contextlib.nested(
mock.patch.object(self._plugin._dvs, 'add_port_group'),
mock.patch.object(self._plugin._dvs, 'delete_port_group')
) as (mock_add, mock_delete):
with self.network(**params) as network:
with self.subnet(network) as subnet:
with self.port(subnet) as port:
self.assertEqual('dvs',
port['port'][portbindings.VIF_TYPE])
port_status = port['port']['status']
self.assertEqual(port_status, 'ACTIVE')
def test_create_router_only_dvs_backend(self):
data = {'router': {'tenant_id': 'whatever'}}
data['router']['name'] = 'router1'
data['router']['external_gateway_info'] = {'network_id': 'whatever'}
self.assertRaises(exp.BadRequest,
self._plugin.create_router,
context.get_admin_context(),
data)
def test_dvs_get_id(self):
id = uuidutils.generate_uuid()
net = {'name': '',
'id': id}
expected = id
self.assertEqual(expected, self._plugin._dvs_get_id(net))
net = {'name': 'pele',
'id': id}
expected = '%s-%s' % ('pele', id)
self.assertEqual(expected, self._plugin._dvs_get_id(net))
name = 'X' * 500
net = {'name': name,
'id': id}
expected = '%s-%s' % (name[:43], id)
self.assertEqual(expected, self._plugin._dvs_get_id(net))

View File

@ -0,0 +1,57 @@
# Copyright (c) 2014 VMware.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo.config import cfg
from oslo.vmware import api
from neutron.tests import base
from vmware_nsx.neutron.plugins.vmware.dvs import dvs_utils
class DvsUtilsTestCase(base.BaseTestCase):
def test_default_configuration(self):
self.assertFalse(dvs_utils.dvs_is_enabled())
def _dvs_fake_cfg_set(self):
cfg.CONF.set_override('host_ip', 'fake_host_ip',
group='dvs')
cfg.CONF.set_override('host_username', 'fake_host_user_name',
group='dvs')
cfg.CONF.set_override('host_password', 'fake_host_pasword',
group='dvs')
cfg.CONF.set_override('dvs_name', 'fake_dvs', group='dvs')
cfg.CONF.set_override('host_port', '443', group='dvs')
def test_dvs_set(self):
self._dvs_fake_cfg_set()
self.assertTrue(dvs_utils.dvs_is_enabled())
@mock.patch.object(api.VMwareAPISession, '__init__',
return_value=None)
def test_dvs_create_session(self, fake_init):
dvs_utils.dvs_create_session()
fake_init.assert_called_once_with(cfg.CONF.dvs.host_ip,
cfg.CONF.dvs.host_username,
cfg.CONF.dvs.host_password,
cfg.CONF.dvs.api_retry_count,
cfg.CONF.dvs.task_poll_interval,
port=cfg.CONF.dvs.host_port)
def test_dvs_name_get(self):
cfg.CONF.set_override('dvs_name', 'fake-dvs', group='dvs')
self.assertEqual('fake-dvs',
dvs_utils.dvs_name_get())