Merge "Improve unit test coverage for Cisco plugin base code"

This commit is contained in:
Jenkins 2013-12-05 11:00:21 +00:00 committed by Gerrit Code Review
commit eb69fc2475
4 changed files with 89 additions and 269 deletions

View File

@ -47,7 +47,6 @@ class VirtualPhysicalSwitchModelV2(neutron_plugin_base_v2.NeutronPluginBaseV2):
following topology: following topology:
One or more servers to a nexus switch. One or more servers to a nexus switch.
""" """
MANAGE_STATE = True
__native_bulk_support = True __native_bulk_support = True
supported_extension_aliases = ["provider", "binding"] supported_extension_aliases = ["provider", "binding"]
_plugins = {} _plugins = {}

View File

@ -16,19 +16,14 @@
# #
# @author: Sumit Naiksatam, Cisco Systems, Inc. # @author: Sumit Naiksatam, Cisco Systems, Inc.
import inspect
import logging import logging
from sqlalchemy import orm
import webob.exc as wexc import webob.exc as wexc
from neutron.api import extensions as neutron_extensions from neutron.api import extensions as neutron_extensions
from neutron.api.v2 import base from neutron.api.v2 import base
from neutron.common import exceptions as exc
from neutron.db import db_base_plugin_v2 from neutron.db import db_base_plugin_v2
from neutron.db import models_v2
from neutron.openstack.common import importutils from neutron.openstack.common import importutils
from neutron.plugins.cisco.common import cisco_constants as const
from neutron.plugins.cisco.common import cisco_exceptions as cexc from neutron.plugins.cisco.common import cisco_exceptions as cexc
from neutron.plugins.cisco.common import config from neutron.plugins.cisco.common import config
from neutron.plugins.cisco.db import network_db_v2 as cdb from neutron.plugins.cisco.db import network_db_v2 as cdb
@ -48,7 +43,6 @@ class PluginV2(db_base_plugin_v2.NeutronDbPluginV2):
'create_subnet', 'create_subnet',
'delete_subnet', 'update_subnet', 'delete_subnet', 'update_subnet',
'get_subnet', 'get_subnets', ] 'get_subnet', 'get_subnets', ]
_master = True
CISCO_FAULT_MAP = { CISCO_FAULT_MAP = {
cexc.NetworkSegmentIDNotFound: wexc.HTTPNotFound, cexc.NetworkSegmentIDNotFound: wexc.HTTPNotFound,
@ -71,13 +65,10 @@ class PluginV2(db_base_plugin_v2.NeutronDbPluginV2):
def __init__(self): def __init__(self):
"""Load the model class.""" """Load the model class."""
self._model = importutils.import_object(config.CISCO.model_class) self._model = importutils.import_object(config.CISCO.model_class)
if hasattr(self._model, "MANAGE_STATE") and self._model.MANAGE_STATE: native_bulk_attr_name = ("_%s__native_bulk_support"
self._master = False % self._model.__class__.__name__)
LOG.debug(_("Model %s manages state"), config.CISCO.model_class) self.__native_bulk_support = getattr(self._model,
native_bulk_attr_name = ("_%s__native_bulk_support" native_bulk_attr_name, False)
% self._model.__class__.__name__)
self.__native_bulk_support = getattr(self._model,
native_bulk_attr_name, False)
if hasattr(self._model, "supported_extension_aliases"): if hasattr(self._model, "supported_extension_aliases"):
self.supported_extension_aliases.extend( self.supported_extension_aliases.extend(
@ -93,14 +84,12 @@ class PluginV2(db_base_plugin_v2.NeutronDbPluginV2):
def __getattribute__(self, name): def __getattribute__(self, name):
"""Delegate core API calls to the model class. """Delegate core API calls to the model class.
When the configured model class offers to manage the state of the Core API calls are delegated directly to the configured model class.
logical resources, we delegate the core API calls directly to it.
Note: Bulking calls will be handled by this class, and turned into Note: Bulking calls will be handled by this class, and turned into
non-bulking calls to be considered for delegation. non-bulking calls to be considered for delegation.
""" """
master = object.__getattribute__(self, "_master")
methods = object.__getattribute__(self, "_methods_to_delegate") methods = object.__getattribute__(self, "_methods_to_delegate")
if not master and name in methods: if name in methods:
return getattr(object.__getattribute__(self, "_model"), return getattr(object.__getattribute__(self, "_model"),
name) name)
else: else:
@ -130,150 +119,6 @@ class PluginV2(db_base_plugin_v2.NeutronDbPluginV2):
""" """
base.FAULT_MAP.update(self.CISCO_FAULT_MAP) base.FAULT_MAP.update(self.CISCO_FAULT_MAP)
"""
Core API implementation
"""
def create_network(self, context, network):
"""Create new Virtual Network, and assigns it a symbolic name."""
LOG.debug(_("create_network() called"))
new_network = super(PluginV2, self).create_network(context,
network)
try:
self._invoke_device_plugins(self._func_name(), [context,
new_network])
return new_network
except Exception:
super(PluginV2, self).delete_network(context,
new_network['id'])
raise
def update_network(self, context, id, network):
"""Update network.
Updates the symbolic name belonging to a particular Virtual Network.
"""
LOG.debug(_("update_network() called"))
upd_net_dict = super(PluginV2, self).update_network(context, id,
network)
self._invoke_device_plugins(self._func_name(), [context, id,
upd_net_dict])
return upd_net_dict
def delete_network(self, context, id):
"""Delete network.
Deletes the network with the specified network identifier
belonging to the specified tenant.
"""
LOG.debug(_("delete_network() called"))
#We first need to check if there are any ports on this network
with context.session.begin():
network = self._get_network(context, id)
filter = {'network_id': [id]}
ports = self.get_ports(context, filters=filter)
# check if there are any tenant owned ports in-use
prefix = db_base_plugin_v2.AGENT_OWNER_PREFIX
only_svc = all(p['device_owner'].startswith(prefix) for p in ports)
if not only_svc:
raise exc.NetworkInUse(net_id=id)
context.session.close()
#Network does not have any ports, we can proceed to delete
network = self._get_network(context, id)
kwargs = {const.NETWORK: network,
const.BASE_PLUGIN_REF: self}
self._invoke_device_plugins(self._func_name(), [context, id,
kwargs])
return super(PluginV2, self).delete_network(context, id)
def get_network(self, context, id, fields=None):
"""Get a particular network."""
LOG.debug(_("get_network() called"))
return super(PluginV2, self).get_network(context, id, fields)
def get_networks(self, context, filters=None, fields=None):
"""Get all networks."""
LOG.debug(_("get_networks() called"))
return super(PluginV2, self).get_networks(context, filters, fields)
def create_port(self, context, port):
"""Create a port on the specified Virtual Network."""
LOG.debug(_("create_port() called"))
new_port = super(PluginV2, self).create_port(context, port)
try:
self._invoke_device_plugins(self._func_name(), [context, new_port])
return new_port
except Exception:
super(PluginV2, self).delete_port(context, new_port['id'])
raise
def delete_port(self, context, id):
LOG.debug(_("delete_port() called"))
port = self._get_port(context, id)
"""Delete port.
TODO (Sumit): Disabling this check for now, check later
Allow deleting a port only if the administrative state is down,
and its operation status is also down
#if port['admin_state_up'] or port['status'] == 'ACTIVE':
# raise exc.PortInUse(port_id=id, net_id=port['network_id'],
# att_id=port['device_id'])
"""
kwargs = {const.PORT: port}
# TODO(Sumit): Might first need to check here if port is active
self._invoke_device_plugins(self._func_name(), [context, id,
kwargs])
return super(PluginV2, self).delete_port(context, id)
def update_port(self, context, id, port):
"""Update the state of a port and return the updated port."""
LOG.debug(_("update_port() called"))
self._invoke_device_plugins(self._func_name(), [context, id,
port])
return super(PluginV2, self).update_port(context, id, port)
def create_subnet(self, context, subnet):
"""Create subnet.
Create a subnet, which represents a range of IP addresses
that can be allocated to devices.
"""
LOG.debug(_("create_subnet() called"))
new_subnet = super(PluginV2, self).create_subnet(context, subnet)
try:
self._invoke_device_plugins(self._func_name(), [context,
new_subnet])
return new_subnet
except Exception:
super(PluginV2, self).delete_subnet(context, new_subnet['id'])
raise
def update_subnet(self, context, id, subnet):
"""Updates the state of a subnet and returns the updated subnet."""
LOG.debug(_("update_subnet() called"))
self._invoke_device_plugins(self._func_name(), [context, id,
subnet])
return super(PluginV2, self).update_subnet(context, id, subnet)
def delete_subnet(self, context, id):
LOG.debug(_("delete_subnet() called"))
with context.session.begin():
subnet = self._get_subnet(context, id)
# Check if ports are using this subnet
allocated_qry = context.session.query(models_v2.IPAllocation)
allocated_qry = allocated_qry.options(orm.joinedload('ports'))
allocated = allocated_qry.filter_by(subnet_id=id)
prefix = db_base_plugin_v2.AGENT_OWNER_PREFIX
if not all(not a.port_id or a.ports.device_owner.startswith(prefix)
for a in allocated):
raise exc.SubnetInUse(subnet_id=id)
context.session.close()
kwargs = {const.SUBNET: subnet}
self._invoke_device_plugins(self._func_name(), [context, id,
kwargs])
return super(PluginV2, self).delete_subnet(context, id)
""" """
Extension API implementation Extension API implementation
""" """
@ -313,59 +158,10 @@ class PluginV2(db_base_plugin_v2.NeutronDbPluginV2):
def get_credential_details(self, credential_id): def get_credential_details(self, credential_id):
"""Get a particular credential.""" """Get a particular credential."""
LOG.debug(_("get_credential_details() called")) LOG.debug(_("get_credential_details() called"))
try: return cdb.get_credential(credential_id)
credential = cdb.get_credential(credential_id)
except exc.NotFound:
raise cexc.CredentialNotFound(credential_id=credential_id)
return credential
def rename_credential(self, credential_id, new_name): def rename_credential(self, credential_id, new_name, new_password):
"""Rename the particular credential resource.""" """Rename the particular credential resource."""
LOG.debug(_("rename_credential() called")) LOG.debug(_("rename_credential() called"))
try: return cdb.update_credential(credential_id, new_name,
credential = cdb.get_credential(credential_id) new_password=new_password)
except exc.NotFound:
raise cexc.CredentialNotFound(credential_id=credential_id)
credential = cdb.update_credential(credential_id, new_name)
return credential
def schedule_host(self, tenant_id, instance_id, instance_desc):
"""Provides the hostname on which a dynamic vnic is reserved."""
LOG.debug(_("schedule_host() called"))
host_list = self._invoke_device_plugins(self._func_name(),
[tenant_id,
instance_id,
instance_desc])
return host_list
def associate_port(self, tenant_id, instance_id, instance_desc):
"""Associate port.
Get the portprofile name and the device name for the dynamic vnic.
"""
LOG.debug(_("associate_port() called"))
return self._invoke_device_plugins(self._func_name(), [tenant_id,
instance_id,
instance_desc])
def detach_port(self, tenant_id, instance_id, instance_desc):
"""Remove the association of the VIF with the dynamic vnic."""
LOG.debug(_("detach_port() called"))
return self._invoke_device_plugins(self._func_name(), [tenant_id,
instance_id,
instance_desc])
"""
Private functions
"""
def _invoke_device_plugins(self, function_name, args):
"""Device-specific calls.
Including core API and extensions are delegated to the model.
"""
if hasattr(self._model, function_name):
return getattr(self._model, function_name)(*args)
def _func_name(self, offset=0):
"""Getting the name of the calling funciton."""
return inspect.stack()[1 + offset][3]

View File

@ -14,26 +14,46 @@
# under the License. # under the License.
import collections import collections
import mock
import testtools import testtools
from neutron.db import api as db from neutron.db import api as db
from neutron.plugins.cisco.common import cisco_exceptions as c_exc from neutron.plugins.cisco.common import cisco_exceptions as c_exc
from neutron.plugins.cisco.db import network_db_v2 as cdb from neutron.plugins.cisco.db import network_db_v2 as cdb
from neutron.plugins.cisco import network_plugin
from neutron.tests import base from neutron.tests import base
class CiscoNetworkQosDbTest(base.BaseTestCase): class CiscoNetworkDbTest(base.BaseTestCase):
"""Unit tests for cisco.db.network_models_v2.QoS model.""" """Base class for Cisco network database unit tests."""
QosObj = collections.namedtuple('QosObj', 'tenant qname desc')
def setUp(self): def setUp(self):
super(CiscoNetworkQosDbTest, self).setUp() super(CiscoNetworkDbTest, self).setUp()
db.configure_db() db.configure_db()
self.session = db.get_session() self.session = db.get_session()
# The Cisco network plugin includes a thin layer of QoS and
# credential API methods which indirectly call Cisco QoS and
# credential database access methods. For better code coverage,
# this test suite will make calls to the QoS and credential database
# access methods indirectly through the network plugin. The network
# plugin's init function can be mocked out for this purpose.
def new_network_plugin_init(instance):
pass
with mock.patch.object(network_plugin.PluginV2,
'__init__', new=new_network_plugin_init):
self._network_plugin = network_plugin.PluginV2()
self.addCleanup(db.clear_db) self.addCleanup(db.clear_db)
class CiscoNetworkQosDbTest(CiscoNetworkDbTest):
"""Unit tests for Cisco network QoS database model."""
QosObj = collections.namedtuple('QosObj', 'tenant qname desc')
def _qos_test_obj(self, tnum, qnum, desc=None): def _qos_test_obj(self, tnum, qnum, desc=None):
"""Create a Qos test object from a pair of numbers.""" """Create a Qos test object from a pair of numbers."""
if desc is None: if desc is None:
@ -49,81 +69,85 @@ class CiscoNetworkQosDbTest(base.BaseTestCase):
def test_qos_add_remove(self): def test_qos_add_remove(self):
qos11 = self._qos_test_obj(1, 1) qos11 = self._qos_test_obj(1, 1)
qos = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc) qos = self._network_plugin.create_qos(qos11.tenant, qos11.qname,
qos11.desc)
self._assert_equal(qos, qos11) self._assert_equal(qos, qos11)
qos_id = qos.qos_id qos_id = qos.qos_id
qos = cdb.remove_qos(qos11.tenant, qos_id) qos = self._network_plugin.delete_qos(qos11.tenant, qos_id)
self._assert_equal(qos, qos11) self._assert_equal(qos, qos11)
qos = cdb.remove_qos(qos11.tenant, qos_id) qos = self._network_plugin.delete_qos(qos11.tenant, qos_id)
self.assertIsNone(qos) self.assertIsNone(qos)
def test_qos_add_dup(self): def test_qos_add_dup(self):
qos22 = self._qos_test_obj(2, 2) qos22 = self._qos_test_obj(2, 2)
qos = cdb.add_qos(qos22.tenant, qos22.qname, qos22.desc) qos = self._network_plugin.create_qos(qos22.tenant, qos22.qname,
qos22.desc)
self._assert_equal(qos, qos22) self._assert_equal(qos, qos22)
qos_id = qos.qos_id qos_id = qos.qos_id
with testtools.ExpectedException(c_exc.QosNameAlreadyExists): with testtools.ExpectedException(c_exc.QosNameAlreadyExists):
cdb.add_qos(qos22.tenant, qos22.qname, "duplicate 22") self._network_plugin.create_qos(qos22.tenant, qos22.qname,
qos = cdb.remove_qos(qos22.tenant, qos_id) "duplicate 22")
qos = self._network_plugin.delete_qos(qos22.tenant, qos_id)
self._assert_equal(qos, qos22) self._assert_equal(qos, qos22)
qos = cdb.remove_qos(qos22.tenant, qos_id) qos = self._network_plugin.delete_qos(qos22.tenant, qos_id)
self.assertIsNone(qos) self.assertIsNone(qos)
def test_qos_get(self): def test_qos_get(self):
qos11 = self._qos_test_obj(1, 1) qos11 = self._qos_test_obj(1, 1)
qos11_id = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc).qos_id qos11_id = self._network_plugin.create_qos(qos11.tenant, qos11.qname,
qos11.desc).qos_id
qos21 = self._qos_test_obj(2, 1) qos21 = self._qos_test_obj(2, 1)
qos21_id = cdb.add_qos(qos21.tenant, qos21.qname, qos21.desc).qos_id qos21_id = self._network_plugin.create_qos(qos21.tenant, qos21.qname,
qos21.desc).qos_id
qos22 = self._qos_test_obj(2, 2) qos22 = self._qos_test_obj(2, 2)
qos22_id = cdb.add_qos(qos22.tenant, qos22.qname, qos22.desc).qos_id qos22_id = self._network_plugin.create_qos(qos22.tenant, qos22.qname,
qos22.desc).qos_id
qos = cdb.get_qos(qos11.tenant, qos11_id) qos = self._network_plugin.get_qos_details(qos11.tenant, qos11_id)
self._assert_equal(qos, qos11) self._assert_equal(qos, qos11)
qos = cdb.get_qos(qos21.tenant, qos21_id) qos = self._network_plugin.get_qos_details(qos21.tenant, qos21_id)
self._assert_equal(qos, qos21) self._assert_equal(qos, qos21)
qos = cdb.get_qos(qos21.tenant, qos22_id) qos = self._network_plugin.get_qos_details(qos21.tenant, qos22_id)
self._assert_equal(qos, qos22) self._assert_equal(qos, qos22)
with testtools.ExpectedException(c_exc.QosNotFound): with testtools.ExpectedException(c_exc.QosNotFound):
cdb.get_qos(qos11.tenant, "dummyQosId") self._network_plugin.get_qos_details(qos11.tenant, "dummyQosId")
with testtools.ExpectedException(c_exc.QosNotFound): with testtools.ExpectedException(c_exc.QosNotFound):
cdb.get_qos(qos11.tenant, qos21_id) self._network_plugin.get_qos_details(qos11.tenant, qos21_id)
with testtools.ExpectedException(c_exc.QosNotFound): with testtools.ExpectedException(c_exc.QosNotFound):
cdb.get_qos(qos21.tenant, qos11_id) self._network_plugin.get_qos_details(qos21.tenant, qos11_id)
qos_all_t1 = cdb.get_all_qoss(qos11.tenant) qos_all_t1 = self._network_plugin.get_all_qoss(qos11.tenant)
self.assertEqual(len(qos_all_t1), 1) self.assertEqual(len(qos_all_t1), 1)
qos_all_t2 = cdb.get_all_qoss(qos21.tenant) qos_all_t2 = self._network_plugin.get_all_qoss(qos21.tenant)
self.assertEqual(len(qos_all_t2), 2) self.assertEqual(len(qos_all_t2), 2)
qos_all_t3 = cdb.get_all_qoss("tenant3") qos_all_t3 = self._network_plugin.get_all_qoss("tenant3")
self.assertEqual(len(qos_all_t3), 0) self.assertEqual(len(qos_all_t3), 0)
def test_qos_update(self): def test_qos_update(self):
qos11 = self._qos_test_obj(1, 1) qos11 = self._qos_test_obj(1, 1)
qos11_id = cdb.add_qos(qos11.tenant, qos11.qname, qos11.desc).qos_id qos11_id = self._network_plugin.create_qos(qos11.tenant, qos11.qname,
cdb.update_qos(qos11.tenant, qos11_id) qos11.desc).qos_id
self._network_plugin.rename_qos(qos11.tenant, qos11_id,
new_name=None)
new_qname = "new qos name" new_qname = "new qos name"
new_qos = cdb.update_qos(qos11.tenant, qos11_id, new_qname) new_qos = self._network_plugin.rename_qos(qos11.tenant, qos11_id,
new_qname)
expected_qobj = self.QosObj(qos11.tenant, new_qname, qos11.desc) expected_qobj = self.QosObj(qos11.tenant, new_qname, qos11.desc)
self._assert_equal(new_qos, expected_qobj) self._assert_equal(new_qos, expected_qobj)
new_qos = cdb.get_qos(qos11.tenant, qos11_id) new_qos = self._network_plugin.get_qos_details(qos11.tenant, qos11_id)
self._assert_equal(new_qos, expected_qobj) self._assert_equal(new_qos, expected_qobj)
with testtools.ExpectedException(c_exc.QosNotFound): with testtools.ExpectedException(c_exc.QosNotFound):
cdb.update_qos(qos11.tenant, "dummyQosId") self._network_plugin.rename_qos(qos11.tenant, "dummyQosId",
new_name=None)
class CiscoNetworkCredentialDbTest(base.BaseTestCase): class CiscoNetworkCredentialDbTest(CiscoNetworkDbTest):
"""Unit tests for cisco.db.network_models_v2.Credential model.""" """Unit tests for Cisco network credentials database model."""
CredObj = collections.namedtuple('CredObj', 'cname usr pwd ctype') CredObj = collections.namedtuple('CredObj', 'cname usr pwd ctype')
def setUp(self):
super(CiscoNetworkCredentialDbTest, self).setUp()
db.configure_db()
self.session = db.get_session()
self.addCleanup(db.clear_db)
def _cred_test_obj(self, tnum, cnum): def _cred_test_obj(self, tnum, cnum):
"""Create a Credential test object from a pair of numbers.""" """Create a Credential test object from a pair of numbers."""
cname = 'credential_%s_%s' % (str(tnum), str(cnum)) cname = 'credential_%s_%s' % (str(tnum), str(cnum))
@ -174,17 +198,17 @@ class CiscoNetworkCredentialDbTest(base.BaseTestCase):
cred22_id = cdb.add_credential( cred22_id = cdb.add_credential(
cred22.cname, cred22.usr, cred22.pwd, cred22.ctype).credential_id cred22.cname, cred22.usr, cred22.pwd, cred22.ctype).credential_id
cred = cdb.get_credential(cred11_id) cred = self._network_plugin.get_credential_details(cred11_id)
self._assert_equal(cred, cred11) self._assert_equal(cred, cred11)
cred = cdb.get_credential(cred21_id) cred = self._network_plugin.get_credential_details(cred21_id)
self._assert_equal(cred, cred21) self._assert_equal(cred, cred21)
cred = cdb.get_credential(cred22_id) cred = self._network_plugin.get_credential_details(cred22_id)
self._assert_equal(cred, cred22) self._assert_equal(cred, cred22)
with testtools.ExpectedException(c_exc.CredentialNotFound): with testtools.ExpectedException(c_exc.CredentialNotFound):
cdb.get_credential("dummyCredentialId") self._network_plugin.get_credential_details("dummyCredentialId")
cred_all_t1 = cdb.get_all_credentials() cred_all_t1 = self._network_plugin.get_all_credentials()
self.assertEqual(len(cred_all_t1), 3) self.assertEqual(len(cred_all_t1), 3)
def test_credential_get_name(self): def test_credential_get_name(self):
@ -215,16 +239,23 @@ class CiscoNetworkCredentialDbTest(base.BaseTestCase):
cred11 = self._cred_test_obj(1, 1) cred11 = self._cred_test_obj(1, 1)
cred11_id = cdb.add_credential( cred11_id = cdb.add_credential(
cred11.cname, cred11.usr, cred11.pwd, cred11.ctype).credential_id cred11.cname, cred11.usr, cred11.pwd, cred11.ctype).credential_id
cdb.update_credential(cred11_id) self._network_plugin.rename_credential(cred11_id, new_name=None,
new_password=None)
new_usr = "new user name" new_usr = "new user name"
new_pwd = "new password" new_pwd = "new password"
new_credential = cdb.update_credential( new_credential = self._network_plugin.rename_credential(
cred11_id, new_usr, new_pwd) cred11_id, new_usr, new_pwd)
expected_cred = self.CredObj( expected_cred = self.CredObj(
cred11.cname, new_usr, new_pwd, cred11.ctype) cred11.cname, new_usr, new_pwd, cred11.ctype)
self._assert_equal(new_credential, expected_cred) self._assert_equal(new_credential, expected_cred)
new_credential = cdb.get_credential(cred11_id) new_credential = self._network_plugin.get_credential_details(
cred11_id)
self._assert_equal(new_credential, expected_cred) self._assert_equal(new_credential, expected_cred)
with testtools.ExpectedException(c_exc.CredentialNotFound): with testtools.ExpectedException(c_exc.CredentialNotFound):
cdb.update_credential( self._network_plugin.rename_credential(
"dummyCredentialId", new_usr, new_pwd) "dummyCredentialId", new_usr, new_pwd)
def test_get_credential_not_found_exception(self):
self.assertRaises(c_exc.CredentialNotFound,
self._network_plugin.get_credential_details,
"dummyCredentialId")

View File

@ -134,14 +134,8 @@ class CiscoNetworkPluginV2TestCase(test_db_plugin.NeutronDbPluginV2TestCase):
self.addCleanup(mock.patch.stopall) self.addCleanup(mock.patch.stopall)
def _get_plugin_ref(self): def _get_plugin_ref(self):
plugin_obj = NeutronManager.get_plugin() return getattr(NeutronManager.get_plugin(),
if getattr(plugin_obj, "_master"): "_model")._plugins[const.VSWITCH_PLUGIN]
plugin_ref = plugin_obj
else:
plugin_ref = getattr(plugin_obj, "_model").\
_plugins[const.VSWITCH_PLUGIN]
return plugin_ref
@contextlib.contextmanager @contextlib.contextmanager
def _patch_ncclient(self, attr, value): def _patch_ncclient(self, attr, value):