Improve unit test coverage for Cisco plugin base code

Closes-Bug: #1190621

This fix improves test code coverage for the file:
neutron/plugins/cisco/network_plugin.py
from 34% to 98%.

Some of the changes made to realize this code coverage improvements
are the following:
- Core API methods (create_network, update_network, etc.) have been
  removed from the core plugin (network_plugin.py). These methods
  are currently unused. Before this change, we relied on the underlying
  model layer to inform the core plugin whether or not the model layer
  was capable of handling these core API calls itself via a 'MANAGE_STATE'
  attribute. However, there is only one existing model layer
  implementation (i.e. for Nexus plugin; the N1KV plugin does not use
  network_plugin.py), and that model layer supports the core API calls.
  Given that it is unlikely that another model layer for the Cisco Nexus
  plugin will ever be developed (esp. with the availability of the ML2
  plugin), so it makes more sense to delete this untested code.
- Exception raising for non-existent credentials has been removed from
  get_credential_details and rename_credential methods since
  exceptions are already raised for this condition in lower-level
  credential database code.
- The schedule_host, associate_port, and detach_port methods are
  deleted because these essentially do nothing useful (log a debug
  message and then return None), since these methods look for
  the same-named methods in the model layer, but these methods are
  not defined in the model layer.
- The helper functions _invoke_device_plugins and _func_name are
  deleted because they are no longer used.
- In unit test code for the Cisco QoS and credentials database, calls
  are now made indirectly through the core plugin (network_plugin.py)
  extension API methods, as a quick-and-easy way to provide code
  coverage for these core plugin methods.

Change-Id: I0c0d9e2b251a7c0355d83e495912302c5cc4d032
This commit is contained in:
Dane LeBlanc 2013-11-27 17:24:35 -05:00
parent 5d1121276b
commit d9bd9328b2
4 changed files with 89 additions and 269 deletions

View File

@ -47,7 +47,6 @@ class VirtualPhysicalSwitchModelV2(neutron_plugin_base_v2.NeutronPluginBaseV2):
following topology: following topology:
One or more servers to a nexus switch. One or more servers to a nexus switch.
""" """
MANAGE_STATE = True
__native_bulk_support = True __native_bulk_support = True
supported_extension_aliases = ["provider", "binding"] supported_extension_aliases = ["provider", "binding"]
_plugins = {} _plugins = {}

View File

@ -16,18 +16,13 @@
# #
# @author: Sumit Naiksatam, Cisco Systems, Inc. # @author: Sumit Naiksatam, Cisco Systems, Inc.
import inspect
import logging import logging
from sqlalchemy import orm
import webob.exc as wexc import webob.exc as wexc
from neutron.api.v2 import base from neutron.api.v2 import base
from neutron.common import exceptions as exc
from neutron.db import db_base_plugin_v2 from neutron.db import db_base_plugin_v2
from neutron.db import models_v2
from neutron.openstack.common import importutils from neutron.openstack.common import importutils
from neutron.plugins.cisco.common import cisco_constants as const
from neutron.plugins.cisco.common import cisco_exceptions as cexc from neutron.plugins.cisco.common import cisco_exceptions as cexc
from neutron.plugins.cisco.common import config from neutron.plugins.cisco.common import config
from neutron.plugins.cisco.db import network_db_v2 as cdb from neutron.plugins.cisco.db import network_db_v2 as cdb
@ -46,7 +41,6 @@ class PluginV2(db_base_plugin_v2.NeutronDbPluginV2):
'create_subnet', 'create_subnet',
'delete_subnet', 'update_subnet', 'delete_subnet', 'update_subnet',
'get_subnet', 'get_subnets', ] 'get_subnet', 'get_subnets', ]
_master = True
CISCO_FAULT_MAP = { CISCO_FAULT_MAP = {
cexc.NetworkSegmentIDNotFound: wexc.HTTPNotFound, cexc.NetworkSegmentIDNotFound: wexc.HTTPNotFound,
@ -69,13 +63,10 @@ class PluginV2(db_base_plugin_v2.NeutronDbPluginV2):
def __init__(self): def __init__(self):
"""Load the model class.""" """Load the model class."""
self._model = importutils.import_object(config.CISCO.model_class) self._model = importutils.import_object(config.CISCO.model_class)
if hasattr(self._model, "MANAGE_STATE") and self._model.MANAGE_STATE: native_bulk_attr_name = ("_%s__native_bulk_support"
self._master = False % self._model.__class__.__name__)
LOG.debug(_("Model %s manages state"), config.CISCO.model_class) self.__native_bulk_support = getattr(self._model,
native_bulk_attr_name = ("_%s__native_bulk_support" native_bulk_attr_name, False)
% self._model.__class__.__name__)
self.__native_bulk_support = getattr(self._model,
native_bulk_attr_name, False)
if hasattr(self._model, "supported_extension_aliases"): if hasattr(self._model, "supported_extension_aliases"):
self.supported_extension_aliases.extend( self.supported_extension_aliases.extend(
@ -89,14 +80,12 @@ class PluginV2(db_base_plugin_v2.NeutronDbPluginV2):
def __getattribute__(self, name): def __getattribute__(self, name):
"""Delegate core API calls to the model class. """Delegate core API calls to the model class.
When the configured model class offers to manage the state of the Core API calls are delegated directly to the configured model class.
logical resources, we delegate the core API calls directly to it.
Note: Bulking calls will be handled by this class, and turned into Note: Bulking calls will be handled by this class, and turned into
non-bulking calls to be considered for delegation. non-bulking calls to be considered for delegation.
""" """
master = object.__getattribute__(self, "_master")
methods = object.__getattribute__(self, "_methods_to_delegate") methods = object.__getattribute__(self, "_methods_to_delegate")
if not master and name in methods: if name in methods:
return getattr(object.__getattribute__(self, "_model"), return getattr(object.__getattribute__(self, "_model"),
name) name)
else: else:
@ -126,150 +115,6 @@ class PluginV2(db_base_plugin_v2.NeutronDbPluginV2):
""" """
base.FAULT_MAP.update(self.CISCO_FAULT_MAP) base.FAULT_MAP.update(self.CISCO_FAULT_MAP)
"""
Core API implementation
"""
def create_network(self, context, network):
"""Create new Virtual Network, and assigns it a symbolic name."""
LOG.debug(_("create_network() called"))
new_network = super(PluginV2, self).create_network(context,
network)
try:
self._invoke_device_plugins(self._func_name(), [context,
new_network])
return new_network
except Exception:
super(PluginV2, self).delete_network(context,
new_network['id'])
raise
def update_network(self, context, id, network):
"""Update network.
Updates the symbolic name belonging to a particular Virtual Network.
"""
LOG.debug(_("update_network() called"))
upd_net_dict = super(PluginV2, self).update_network(context, id,
network)
self._invoke_device_plugins(self._func_name(), [context, id,
upd_net_dict])
return upd_net_dict
def delete_network(self, context, id):
"""Delete network.
Deletes the network with the specified network identifier
belonging to the specified tenant.
"""
LOG.debug(_("delete_network() called"))
#We first need to check if there are any ports on this network
with context.session.begin():
network = self._get_network(context, id)
filter = {'network_id': [id]}
ports = self.get_ports(context, filters=filter)
# check if there are any tenant owned ports in-use
prefix = db_base_plugin_v2.AGENT_OWNER_PREFIX
only_svc = all(p['device_owner'].startswith(prefix) for p in ports)
if not only_svc:
raise exc.NetworkInUse(net_id=id)
context.session.close()
#Network does not have any ports, we can proceed to delete
network = self._get_network(context, id)
kwargs = {const.NETWORK: network,
const.BASE_PLUGIN_REF: self}
self._invoke_device_plugins(self._func_name(), [context, id,
kwargs])
return super(PluginV2, self).delete_network(context, id)
def get_network(self, context, id, fields=None):
"""Get a particular network."""
LOG.debug(_("get_network() called"))
return super(PluginV2, self).get_network(context, id, fields)
def get_networks(self, context, filters=None, fields=None):
"""Get all networks."""
LOG.debug(_("get_networks() called"))
return super(PluginV2, self).get_networks(context, filters, fields)
def create_port(self, context, port):
"""Create a port on the specified Virtual Network."""
LOG.debug(_("create_port() called"))
new_port = super(PluginV2, self).create_port(context, port)
try:
self._invoke_device_plugins(self._func_name(), [context, new_port])
return new_port
except Exception:
super(PluginV2, self).delete_port(context, new_port['id'])
raise
def delete_port(self, context, id):
LOG.debug(_("delete_port() called"))
port = self._get_port(context, id)
"""Delete port.
TODO (Sumit): Disabling this check for now, check later
Allow deleting a port only if the administrative state is down,
and its operation status is also down
#if port['admin_state_up'] or port['status'] == 'ACTIVE':
# raise exc.PortInUse(port_id=id, net_id=port['network_id'],
# att_id=port['device_id'])
"""
kwargs = {const.PORT: port}
# TODO(Sumit): Might first need to check here if port is active
self._invoke_device_plugins(self._func_name(), [context, id,
kwargs])
return super(PluginV2, self).delete_port(context, id)
def update_port(self, context, id, port):
"""Update the state of a port and return the updated port."""
LOG.debug(_("update_port() called"))
self._invoke_device_plugins(self._func_name(), [context, id,
port])
return super(PluginV2, self).update_port(context, id, port)
def create_subnet(self, context, subnet):
"""Create subnet.
Create a subnet, which represents a range of IP addresses
that can be allocated to devices.
"""
LOG.debug(_("create_subnet() called"))
new_subnet = super(PluginV2, self).create_subnet(context, subnet)
try:
self._invoke_device_plugins(self._func_name(), [context,
new_subnet])
return new_subnet
except Exception:
super(PluginV2, self).delete_subnet(context, new_subnet['id'])
raise
def update_subnet(self, context, id, subnet):
"""Updates the state of a subnet and returns the updated subnet."""
LOG.debug(_("update_subnet() called"))
self._invoke_device_plugins(self._func_name(), [context, id,
subnet])
return super(PluginV2, self).update_subnet(context, id, subnet)
def delete_subnet(self, context, id):
LOG.debug(_("delete_subnet() called"))
with context.session.begin():
subnet = self._get_subnet(context, id)
# Check if ports are using this subnet
allocated_qry = context.session.query(models_v2.IPAllocation)
allocated_qry = allocated_qry.options(orm.joinedload('ports'))
allocated = allocated_qry.filter_by(subnet_id=id)
prefix = db_base_plugin_v2.AGENT_OWNER_PREFIX
if not all(not a.port_id or a.ports.device_owner.startswith(prefix)
for a in allocated):
raise exc.SubnetInUse(subnet_id=id)
context.session.close()
kwargs = {const.SUBNET: subnet}
self._invoke_device_plugins(self._func_name(), [context, id,
kwargs])
return super(PluginV2, self).delete_subnet(context, id)
""" """
Extension API implementation Extension API implementation
""" """
@ -309,59 +154,10 @@ class PluginV2(db_base_plugin_v2.NeutronDbPluginV2):
def get_credential_details(self, credential_id): def get_credential_details(self, credential_id):
"""Get a particular credential.""" """Get a particular credential."""
LOG.debug(_("get_credential_details() called")) LOG.debug(_("get_credential_details() called"))
try: return cdb.get_credential(credential_id)
credential = cdb.get_credential(credential_id)
except exc.NotFound:
raise cexc.CredentialNotFound(credential_id=credential_id)
return credential
def rename_credential(self, credential_id, new_name): def rename_credential(self, credential_id, new_name, new_password):
"""Rename the particular credential resource.""" """Rename the particular credential resource."""
LOG.debug(_("rename_credential() called")) LOG.debug(_("rename_credential() called"))
try: return cdb.update_credential(credential_id, new_name,
credential = cdb.get_credential(credential_id) new_password=new_password)
except exc.NotFound:
raise cexc.CredentialNotFound(credential_id=credential_id)
credential = cdb.update_credential(credential_id, new_name)
return credential
def schedule_host(self, tenant_id, instance_id, instance_desc):
"""Provides the hostname on which a dynamic vnic is reserved."""
LOG.debug(_("schedule_host() called"))
host_list = self._invoke_device_plugins(self._func_name(),
[tenant_id,
instance_id,
instance_desc])
return host_list
def associate_port(self, tenant_id, instance_id, instance_desc):
"""Associate port.
Get the portprofile name and the device name for the dynamic vnic.
"""
LOG.debug(_("associate_port() called"))
return self._invoke_device_plugins(self._func_name(), [tenant_id,
instance_id,
instance_desc])
def detach_port(self, tenant_id, instance_id, instance_desc):
"""Remove the association of the VIF with the dynamic vnic."""
LOG.debug(_("detach_port() called"))
return self._invoke_device_plugins(self._func_name(), [tenant_id,
instance_id,
instance_desc])
"""
Private functions
"""
def _invoke_device_plugins(self, function_name, args):
"""Device-specific calls.
Including core API and extensions are delegated to the model.
"""
if hasattr(self._model, function_name):
return getattr(self._model, function_name)(*args)
def _func_name(self, offset=0):
"""Getting the name of the calling funciton."""
return inspect.stack()[1 + offset][3]

View File

@ -14,26 +14,46 @@
# under the License. # under the License.
import collections import collections
import mock
import testtools import testtools
from neutron.db import api as db from neutron.db import api as db
from neutron.plugins.cisco.common import cisco_exceptions as c_exc from neutron.plugins.cisco.common import cisco_exceptions as c_exc
from neutron.plugins.cisco.db import network_db_v2 as cdb from neutron.plugins.cisco.db import network_db_v2 as cdb
from neutron.plugins.cisco import network_plugin
from neutron.tests import base from neutron.tests import base
class CiscoNetworkQosDbTest(base.BaseTestCase): class CiscoNetworkDbTest(base.BaseTestCase):
"""Unit tests for cisco.db.network_models_v2.QoS model.""" """Base class for Cisco network database unit tests."""
QosObj = collections.namedtuple('QosObj', 'tenant qname desc')
def setUp(self): def setUp(self):
super(CiscoNetworkQosDbTest, self).setUp() super(CiscoNetworkDbTest, self).setUp()
db.configure_db() db.configure_db()
self.session = db.get_session() self.session = db.get_session()
# The Cisco network plugin includes a thin layer of QoS and
# credential API methods which indirectly call Cisco QoS and
# credential database access methods. For better code coverage,
# this test suite will make calls to the QoS and credential database
# access methods indirectly through the network plugin. The network
# plugin's init function can be mocked out for this purpose.
def new_network_plugin_init(instance):
pass
with mock.patch.object(network_plugin.PluginV2,
'__init__', new=new_network_plugin_init):
self._network_plugin = network_plugin.PluginV2()
self.addCleanup(db.clear_db) self.addCleanup(db.clear_db)
class CiscoNetworkQosDbTest(CiscoNetworkDbTest):
"""Unit tests for Cisco network QoS database model."""
QosObj = collections.namedtuple('QosObj', 'tenant qname desc')
def _qos_test_obj(self, tnum, qnum, desc=None): def _qos_test_obj(self, tnum, qnum, desc=None):
"""Create a Qos test object from a pair of numbers.""" """Create a Qos test object from a pair of numbers."""
if desc is None: if desc is None:
@ -49,81 +69,85 @@ class CiscoNetworkQosDbTest(base.BaseTestCase):
def test_qos_add_remove(self): def test_qos_add_remove(self):
qos11 = self._qos_test_obj(1, 1) qos11 = self._qos_test_obj(1, 1)
qos = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc) qos = self._network_plugin.create_qos(qos11.tenant, qos11.qname,
qos11.desc)
self._assert_equal(qos, qos11) self._assert_equal(qos, qos11)
qos_id = qos.qos_id qos_id = qos.qos_id
qos = cdb.remove_qos(qos11.tenant, qos_id) qos = self._network_plugin.delete_qos(qos11.tenant, qos_id)
self._assert_equal(qos, qos11) self._assert_equal(qos, qos11)
qos = cdb.remove_qos(qos11.tenant, qos_id) qos = self._network_plugin.delete_qos(qos11.tenant, qos_id)
self.assertIsNone(qos) self.assertIsNone(qos)
def test_qos_add_dup(self): def test_qos_add_dup(self):
qos22 = self._qos_test_obj(2, 2) qos22 = self._qos_test_obj(2, 2)
qos = cdb.add_qos(qos22.tenant, qos22.qname, qos22.desc) qos = self._network_plugin.create_qos(qos22.tenant, qos22.qname,
qos22.desc)
self._assert_equal(qos, qos22) self._assert_equal(qos, qos22)
qos_id = qos.qos_id qos_id = qos.qos_id
with testtools.ExpectedException(c_exc.QosNameAlreadyExists): with testtools.ExpectedException(c_exc.QosNameAlreadyExists):
cdb.add_qos(qos22.tenant, qos22.qname, "duplicate 22") self._network_plugin.create_qos(qos22.tenant, qos22.qname,
qos = cdb.remove_qos(qos22.tenant, qos_id) "duplicate 22")
qos = self._network_plugin.delete_qos(qos22.tenant, qos_id)
self._assert_equal(qos, qos22) self._assert_equal(qos, qos22)
qos = cdb.remove_qos(qos22.tenant, qos_id) qos = self._network_plugin.delete_qos(qos22.tenant, qos_id)
self.assertIsNone(qos) self.assertIsNone(qos)
def test_qos_get(self): def test_qos_get(self):
qos11 = self._qos_test_obj(1, 1) qos11 = self._qos_test_obj(1, 1)
qos11_id = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc).qos_id qos11_id = self._network_plugin.create_qos(qos11.tenant, qos11.qname,
qos11.desc).qos_id
qos21 = self._qos_test_obj(2, 1) qos21 = self._qos_test_obj(2, 1)
qos21_id = cdb.add_qos(qos21.tenant, qos21.qname, qos21.desc).qos_id qos21_id = self._network_plugin.create_qos(qos21.tenant, qos21.qname,
qos21.desc).qos_id
qos22 = self._qos_test_obj(2, 2) qos22 = self._qos_test_obj(2, 2)
qos22_id = cdb.add_qos(qos22.tenant, qos22.qname, qos22.desc).qos_id qos22_id = self._network_plugin.create_qos(qos22.tenant, qos22.qname,
qos22.desc).qos_id
qos = cdb.get_qos(qos11.tenant, qos11_id) qos = self._network_plugin.get_qos_details(qos11.tenant, qos11_id)
self._assert_equal(qos, qos11) self._assert_equal(qos, qos11)
qos = cdb.get_qos(qos21.tenant, qos21_id) qos = self._network_plugin.get_qos_details(qos21.tenant, qos21_id)
self._assert_equal(qos, qos21) self._assert_equal(qos, qos21)
qos = cdb.get_qos(qos21.tenant, qos22_id) qos = self._network_plugin.get_qos_details(qos21.tenant, qos22_id)
self._assert_equal(qos, qos22) self._assert_equal(qos, qos22)
with testtools.ExpectedException(c_exc.QosNotFound): with testtools.ExpectedException(c_exc.QosNotFound):
cdb.get_qos(qos11.tenant, "dummyQosId") self._network_plugin.get_qos_details(qos11.tenant, "dummyQosId")
with testtools.ExpectedException(c_exc.QosNotFound): with testtools.ExpectedException(c_exc.QosNotFound):
cdb.get_qos(qos11.tenant, qos21_id) self._network_plugin.get_qos_details(qos11.tenant, qos21_id)
with testtools.ExpectedException(c_exc.QosNotFound): with testtools.ExpectedException(c_exc.QosNotFound):
cdb.get_qos(qos21.tenant, qos11_id) self._network_plugin.get_qos_details(qos21.tenant, qos11_id)
qos_all_t1 = cdb.get_all_qoss(qos11.tenant) qos_all_t1 = self._network_plugin.get_all_qoss(qos11.tenant)
self.assertEqual(len(qos_all_t1), 1) self.assertEqual(len(qos_all_t1), 1)
qos_all_t2 = cdb.get_all_qoss(qos21.tenant) qos_all_t2 = self._network_plugin.get_all_qoss(qos21.tenant)
self.assertEqual(len(qos_all_t2), 2) self.assertEqual(len(qos_all_t2), 2)
qos_all_t3 = cdb.get_all_qoss("tenant3") qos_all_t3 = self._network_plugin.get_all_qoss("tenant3")
self.assertEqual(len(qos_all_t3), 0) self.assertEqual(len(qos_all_t3), 0)
def test_qos_update(self): def test_qos_update(self):
qos11 = self._qos_test_obj(1, 1) qos11 = self._qos_test_obj(1, 1)
qos11_id = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc).qos_id qos11_id = self._network_plugin.create_qos(qos11.tenant, qos11.qname,
cdb.update_qos(qos11.tenant, qos11_id) qos11.desc).qos_id
self._network_plugin.rename_qos(qos11.tenant, qos11_id,
new_name=None)
new_qname = "new qos name" new_qname = "new qos name"
new_qos = cdb.update_qos(qos11.tenant, qos11_id, new_qname) new_qos = self._network_plugin.rename_qos(qos11.tenant, qos11_id,
new_qname)
expected_qobj = self.QosObj(qos11.tenant, new_qname, qos11.desc) expected_qobj = self.QosObj(qos11.tenant, new_qname, qos11.desc)
self._assert_equal(new_qos, expected_qobj) self._assert_equal(new_qos, expected_qobj)
new_qos = cdb.get_qos(qos11.tenant, qos11_id) new_qos = self._network_plugin.get_qos_details(qos11.tenant, qos11_id)
self._assert_equal(new_qos, expected_qobj) self._assert_equal(new_qos, expected_qobj)
with testtools.ExpectedException(c_exc.QosNotFound): with testtools.ExpectedException(c_exc.QosNotFound):
cdb.update_qos(qos11.tenant, "dummyQosId") self._network_plugin.rename_qos(qos11.tenant, "dummyQosId",
new_name=None)
class CiscoNetworkCredentialDbTest(base.BaseTestCase): class CiscoNetworkCredentialDbTest(CiscoNetworkDbTest):
"""Unit tests for cisco.db.network_models_v2.Credential model.""" """Unit tests for Cisco network credentials database model."""
CredObj = collections.namedtuple('CredObj', 'cname usr pwd ctype') CredObj = collections.namedtuple('CredObj', 'cname usr pwd ctype')
def setUp(self):
super(CiscoNetworkCredentialDbTest, self).setUp()
db.configure_db()
self.session = db.get_session()
self.addCleanup(db.clear_db)
def _cred_test_obj(self, tnum, cnum): def _cred_test_obj(self, tnum, cnum):
"""Create a Credential test object from a pair of numbers.""" """Create a Credential test object from a pair of numbers."""
cname = 'credential_%s_%s' % (str(tnum), str(cnum)) cname = 'credential_%s_%s' % (str(tnum), str(cnum))
@ -174,17 +198,17 @@ class CiscoNetworkCredentialDbTest(base.BaseTestCase):
cred22_id = cdb.add_credential( cred22_id = cdb.add_credential(
cred22.cname, cred22.usr, cred22.pwd, cred22.ctype).credential_id cred22.cname, cred22.usr, cred22.pwd, cred22.ctype).credential_id
cred = cdb.get_credential(cred11_id) cred = self._network_plugin.get_credential_details(cred11_id)
self._assert_equal(cred, cred11) self._assert_equal(cred, cred11)
cred = cdb.get_credential(cred21_id) cred = self._network_plugin.get_credential_details(cred21_id)
self._assert_equal(cred, cred21) self._assert_equal(cred, cred21)
cred = cdb.get_credential(cred22_id) cred = self._network_plugin.get_credential_details(cred22_id)
self._assert_equal(cred, cred22) self._assert_equal(cred, cred22)
with testtools.ExpectedException(c_exc.CredentialNotFound): with testtools.ExpectedException(c_exc.CredentialNotFound):
cdb.get_credential("dummyCredentialId") self._network_plugin.get_credential_details("dummyCredentialId")
cred_all_t1 = cdb.get_all_credentials() cred_all_t1 = self._network_plugin.get_all_credentials()
self.assertEqual(len(cred_all_t1), 3) self.assertEqual(len(cred_all_t1), 3)
def test_credential_get_name(self): def test_credential_get_name(self):
@ -215,16 +239,23 @@ class CiscoNetworkCredentialDbTest(base.BaseTestCase):
cred11 = self._cred_test_obj(1, 1) cred11 = self._cred_test_obj(1, 1)
cred11_id = cdb.add_credential( cred11_id = cdb.add_credential(
cred11.cname, cred11.usr, cred11.pwd, cred11.ctype).credential_id cred11.cname, cred11.usr, cred11.pwd, cred11.ctype).credential_id
cdb.update_credential(cred11_id) self._network_plugin.rename_credential(cred11_id, new_name=None,
new_password=None)
new_usr = "new user name" new_usr = "new user name"
new_pwd = "new password" new_pwd = "new password"
new_credential = cdb.update_credential( new_credential = self._network_plugin.rename_credential(
cred11_id, new_usr, new_pwd) cred11_id, new_usr, new_pwd)
expected_cred = self.CredObj( expected_cred = self.CredObj(
cred11.cname, new_usr, new_pwd, cred11.ctype) cred11.cname, new_usr, new_pwd, cred11.ctype)
self._assert_equal(new_credential, expected_cred) self._assert_equal(new_credential, expected_cred)
new_credential = cdb.get_credential(cred11_id) new_credential = self._network_plugin.get_credential_details(
cred11_id)
self._assert_equal(new_credential, expected_cred) self._assert_equal(new_credential, expected_cred)
with testtools.ExpectedException(c_exc.CredentialNotFound): with testtools.ExpectedException(c_exc.CredentialNotFound):
cdb.update_credential( self._network_plugin.rename_credential(
"dummyCredentialId", new_usr, new_pwd) "dummyCredentialId", new_usr, new_pwd)
def test_get_credential_not_found_exception(self):
self.assertRaises(c_exc.CredentialNotFound,
self._network_plugin.get_credential_details,
"dummyCredentialId")

View File

@ -134,14 +134,8 @@ class CiscoNetworkPluginV2TestCase(test_db_plugin.NeutronDbPluginV2TestCase):
self.addCleanup(mock.patch.stopall) self.addCleanup(mock.patch.stopall)
def _get_plugin_ref(self): def _get_plugin_ref(self):
plugin_obj = NeutronManager.get_plugin() return getattr(NeutronManager.get_plugin(),
if getattr(plugin_obj, "_master"): "_model")._plugins[const.VSWITCH_PLUGIN]
plugin_ref = plugin_obj
else:
plugin_ref = getattr(plugin_obj, "_model").\
_plugins[const.VSWITCH_PLUGIN]
return plugin_ref
@contextlib.contextmanager @contextlib.contextmanager
def _patch_ncclient(self, attr, value): def _patch_ncclient(self, attr, value):