Merge "BigSwitch: Auto re-sync on backend inconsistencies"

This commit is contained in:
Jenkins 2014-03-06 07:09:16 +00:00 committed by Gerrit Code Review
commit b9760d93f1
11 changed files with 341 additions and 41 deletions

View File

@ -8,7 +8,8 @@
# server_auth : <username:password> (default: no auth)
# server_ssl : True | False (default: False)
# sync_data : True | False (default: False)
# server_timeout : <int> (default: 10 seconds)
# auto_sync_on_failure : True | False (default: True)
# server_timeout : <integer> (default: 10 seconds)
# neutron_id : <string> (default: neutron-<hostname>)
# add_meta_server_route : True | False (default: True)
# thread_pool_size : <int> (default: 4)
@ -25,6 +26,11 @@ servers=localhost:8080
# Sync data on connect
# sync_data=False
# If neutron fails to create a resource because the backend controller
# doesn't know of a dependency, automatically trigger a full data
# synchronization to the controller.
# auto_sync_on_failure=True
# Maximum number of seconds to wait for proxy request to connect and complete.
# server_timeout=10

View File

@ -0,0 +1,55 @@
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""bsn_consistencyhashes
Revision ID: 81c553f3776c
Revises: 24c7ea5160d7
Create Date: 2014-02-26 18:56:00.402855
"""
# revision identifiers, used by Alembic.
revision = '81c553f3776c'
down_revision = '24c7ea5160d7'
# Change to ['*'] if this migration applies to all plugins
migration_for_plugins = [
'neutron.plugins.bigswitch.plugin.NeutronRestProxyV2'
]
from alembic import op
import sqlalchemy as sa
from neutron.db import migration
def upgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.create_table(
'consistencyhashes',
sa.Column('hash_id', sa.String(255), primary_key=True),
sa.Column('hash', sa.String(255), nullable=False)
)
def downgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.drop_table('consistencyhashes')

View File

@ -44,6 +44,14 @@ restproxy_opts = [
"Floodlight controller.")),
cfg.BoolOpt('sync_data', default=False,
help=_("Sync data on connect")),
cfg.BoolOpt('auto_sync_on_failure', default=True,
help=_("If neutron fails to create a resource because "
"the backend controller doesn't know of a dependency, "
"automatically trigger a full data synchronization "
"to the controller.")),
cfg.IntOpt('consistency_interval', default=60,
help=_("Time between verifications that the backend controller "
"database is consistent with Neutron")),
cfg.IntOpt('server_timeout', default=10,
help=_("Maximum number of seconds to wait for proxy request "
"to connect and complete.")),

View File

@ -0,0 +1,56 @@
# Copyright 2014, Big Switch Networks
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sa
from neutron.db import api as db
from neutron.db import model_base
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
'''
A simple table to store the latest consistency hash
received from a server in case neutron gets restarted.
'''
class ConsistencyHash(model_base.BASEV2):
'''
For now we only support one global state so the
hash_id will always be '1'
'''
__tablename__ = 'consistencyhashes'
hash_id = sa.Column(sa.String(255),
primary_key=True)
hash = sa.Column(sa.String(255), nullable=False)
def get_consistency_hash(hash_id='1'):
session = db.get_session()
with session.begin(subtransactions=True):
query = session.query(ConsistencyHash)
res = query.filter_by(hash_id=hash_id).first()
if not res:
return False
return res.hash
def put_consistency_hash(hash, hash_id='1'):
session = db.get_session()
with session.begin(subtransactions=True):
conhash = ConsistencyHash(hash_id=hash_id, hash=hash)
session.merge(conhash)
LOG.debug(_("Consistency hash for group %(hash_id)s updated "
"to %(hash)s"), {'hash_id': hash_id, 'hash': hash})

View File

@ -45,6 +45,7 @@ on port-attach) on an additional PUT to do a bulk dump of all persistent data.
"""
import copy
import httplib
import re
import eventlet
@ -172,13 +173,8 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
if not self.servers:
LOG.warning(_("ServerPool not set!"))
def _send_all_data(self, send_ports=True, send_floating_ips=True,
send_routers=True):
"""Pushes all data to network ctrl (networks/ports, ports/attachments).
This gives the controller an option to re-sync it's persistent store
with neutron's current view of that data.
"""
def _get_all_data(self, get_ports=True, get_floating_ips=True,
get_routers=True):
admin_context = qcontext.get_admin_context()
networks = []
@ -186,11 +182,11 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
for net in all_networks:
mapped_network = self._get_mapped_network_with_subnets(net)
flips_n_ports = {}
if send_floating_ips:
if get_floating_ips:
flips_n_ports = self._get_network_with_floatingips(
mapped_network)
if send_ports:
if get_ports:
ports = []
net_filter = {'network_id': [net.get('id')]}
net_ports = self.get_ports(admin_context,
@ -209,12 +205,9 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
if flips_n_ports:
networks.append(flips_n_ports)
resource = '/topology'
data = {
'networks': networks,
}
data = {'networks': networks}
if send_routers:
if get_routers:
routers = []
all_routers = self.get_routers(admin_context) or []
for router in all_routers:
@ -238,9 +231,21 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
routers.append(mapped_router)
data.update({'routers': routers})
return data
def _send_all_data(self, send_ports=True, send_floating_ips=True,
send_routers=True, timeout=None,
triggered_by_tenant=None):
"""Pushes all data to network ctrl (networks/ports, ports/attachments).
This gives the controller an option to re-sync it's persistent store
with neutron's current view of that data.
"""
data = self._get_all_data(send_ports, send_floating_ips, send_routers)
data['triggered_by_tenant'] = triggered_by_tenant
errstr = _("Unable to update remote topology: %s")
return self.servers.rest_action('PUT', resource, data, errstr)
return self.servers.rest_action('PUT', servermanager.TOPOLOGY_PATH,
data, errstr, timeout=timeout)
def _get_network_with_floatingips(self, network, context=None):
if context is None:
@ -386,15 +391,38 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
try:
self.servers.rest_create_port(tenant_id, net_id, port)
except servermanager.RemoteRestError as e:
LOG.error(
_("NeutronRestProxyV2: Unable to create port: %s"), e)
try:
self._set_port_status(port['id'], const.PORT_STATUS_ERROR)
except exceptions.PortNotFound:
# If port is already gone from DB and there was an error
# creating on the backend, everything is already consistent
pass
return
# 404 should never be received on a port create unless
# there are inconsistencies between the data in neutron
# and the data in the backend.
# Run a sync to get it consistent.
if (cfg.CONF.RESTPROXY.auto_sync_on_failure and
e.status == httplib.NOT_FOUND and
servermanager.NXNETWORK in e.reason):
LOG.error(_("Iconsistency with backend controller "
"triggering full synchronization."))
# args depend on if we are operating in ML2 driver
# or as the full plugin
topoargs = self.servers.get_topo_function_args
self._send_all_data(
send_ports=topoargs['get_ports'],
send_floating_ips=topoargs['get_floating_ips'],
send_routers=topoargs['get_routers'],
triggered_by_tenant=tenant_id
)
# If the full sync worked, the port will be created
# on the controller so it can be safely marked as active
else:
# Any errors that don't result in a successful auto-sync
# require that the port be placed into the error state.
LOG.error(
_("NeutronRestProxyV2: Unable to create port: %s"), e)
try:
self._set_port_status(port['id'], const.PORT_STATUS_ERROR)
except exceptions.PortNotFound:
# If port is already gone from DB and there was an error
# creating on the backend, everything is already consistent
pass
return
new_status = (const.PORT_STATUS_ACTIVE if port['state'] == 'UP'
else const.PORT_STATUS_DOWN)
try:
@ -448,6 +476,10 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# init network ctrl connections
self.servers = servermanager.ServerPool(server_timeout)
self.servers.get_topo_function = self._get_all_data
self.servers.get_topo_function_args = {'get_ports': True,
'get_floating_ips': True,
'get_routers': True}
self.network_scheduler = importutils.import_object(
cfg.CONF.network_scheduler_driver

View File

@ -34,13 +34,15 @@ import base64
import httplib
import json
import socket
import time
import eventlet
from oslo.config import cfg
from neutron.common import exceptions
from neutron.common import utils
from neutron.openstack.common import log as logging
from neutron.plugins.bigswitch.db import consistency_db as cdb
LOG = logging.getLogger(__name__)
@ -56,23 +58,34 @@ PORTS_PATH = "/tenants/%s/networks/%s/ports/%s"
ATTACHMENT_PATH = "/tenants/%s/networks/%s/ports/%s/attachment"
ROUTERS_PATH = "/tenants/%s/routers/%s"
ROUTER_INTF_PATH = "/tenants/%s/routers/%s/interfaces/%s"
TOPOLOGY_PATH = "/topology"
HEALTH_PATH = "/health"
SUCCESS_CODES = range(200, 207)
FAILURE_CODES = [0, 301, 302, 303, 400, 401, 403, 404, 500, 501, 502, 503,
504, 505]
BASE_URI = '/networkService/v1.1'
ORCHESTRATION_SERVICE_ID = 'Neutron v2.0'
HASH_MATCH_HEADER = 'X-BSN-BVS-HASH-MATCH'
# error messages
NXNETWORK = 'NXVNS'
class RemoteRestError(exceptions.NeutronException):
message = _("Error in REST call to remote network "
"controller: %(reason)s")
status = None
def __init__(self, **kwargs):
self.status = kwargs.pop('status', None)
self.reason = kwargs.get('reason')
super(RemoteRestError, self).__init__(**kwargs)
class ServerProxy(object):
"""REST server proxy to a network controller."""
def __init__(self, server, port, ssl, auth, neutron_id, timeout,
base_uri, name):
base_uri, name, mypool):
self.server = server
self.port = port
self.ssl = ssl
@ -84,6 +97,8 @@ class ServerProxy(object):
self.neutron_id = neutron_id
self.failed = False
self.capabilities = []
# enable server to reference parent pool
self.mypool = mypool
if auth:
self.auth = 'Basic ' + base64.encodestring(auth).strip()
@ -99,7 +114,7 @@ class ServerProxy(object):
'cap': self.capabilities})
return self.capabilities
def rest_call(self, action, resource, data='', headers=None):
def rest_call(self, action, resource, data='', headers={}, timeout=None):
uri = self.base_uri + resource
body = json.dumps(data)
if not headers:
@ -109,6 +124,7 @@ class ServerProxy(object):
headers['NeutronProxy-Agent'] = self.name
headers['Instance-ID'] = self.neutron_id
headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID
headers[HASH_MATCH_HEADER] = self.mypool.consistency_hash
if self.auth:
headers['Authorization'] = self.auth
@ -121,16 +137,17 @@ class ServerProxy(object):
'action': action})
conn = None
timeout = timeout or self.timeout
if self.ssl:
conn = httplib.HTTPSConnection(
self.server, self.port, timeout=self.timeout)
self.server, self.port, timeout=timeout)
if conn is None:
LOG.error(_('ServerProxy: Could not establish HTTPS '
'connection'))
return 0, None, None, None
else:
conn = httplib.HTTPConnection(
self.server, self.port, timeout=self.timeout)
self.server, self.port, timeout=timeout)
if conn is None:
LOG.error(_('ServerProxy: Could not establish HTTP '
'connection'))
@ -139,6 +156,9 @@ class ServerProxy(object):
try:
conn.request(action, uri, body, headers)
response = conn.getresponse()
newhash = response.getheader(HASH_MATCH_HEADER)
if newhash:
self._put_consistency_hash(newhash)
respstr = response.read()
respdata = respstr
if response.status in self.success_codes:
@ -160,6 +180,10 @@ class ServerProxy(object):
'data': ret[3]})
return ret
def _put_consistency_hash(self, newhash):
self.mypool.consistency_hash = newhash
cdb.put_consistency_hash(newhash)
class ServerPool(object):
@ -180,6 +204,17 @@ class ServerPool(object):
if timeout is not None:
self.timeout = timeout
# Function to use to retrieve topology for consistency syncs.
# Needs to be set by module that uses the servermanager.
self.get_topo_function = None
self.get_topo_function_args = {}
# Hash to send to backend with request as expected previous
# state to verify consistency.
self.consistency_hash = cdb.get_consistency_hash()
eventlet.spawn(self._consistency_watchdog,
cfg.CONF.RESTPROXY.consistency_interval)
if not servers:
raise cfg.Error(_('Servers not defined. Aborting server manager.'))
servers = [s if len(s.rsplit(':', 1)) == 2
@ -210,7 +245,7 @@ class ServerPool(object):
def server_proxy_for(self, server, port):
return ServerProxy(server, port, self.ssl, self.auth, self.neutron_id,
self.timeout, self.base_uri, self.name)
self.timeout, self.base_uri, self.name, mypool=self)
def server_failure(self, resp, ignore_codes=[]):
"""Define failure codes as required.
@ -228,10 +263,27 @@ class ServerPool(object):
return resp[0] in SUCCESS_CODES
@utils.synchronized('bsn-rest-call')
def rest_call(self, action, resource, data, headers, ignore_codes):
def rest_call(self, action, resource, data, headers, ignore_codes,
timeout=None):
good_first = sorted(self.servers, key=lambda x: x.failed)
first_response = None
for active_server in good_first:
ret = active_server.rest_call(action, resource, data, headers)
ret = active_server.rest_call(action, resource, data, headers,
timeout)
# If inconsistent, do a full synchronization
if ret[0] == httplib.CONFLICT:
if not self.get_topo_function:
raise cfg.Error(_('Server requires synchronization, '
'but no topology function was defined.'))
data = self.get_topo_function(**self.get_topo_function_args)
active_server.rest_call('PUT', TOPOLOGY_PATH, data,
timeout=None)
# Store the first response as the error to be bubbled up to the
# user since it was a good server. Subsequent servers will most
# likely be cluster slaves and won't have a useful error for the
# user (e.g. 302 redirect to master)
if not first_response:
first_response = ret
if not self.server_failure(ret, ignore_codes):
active_server.failed = False
return ret
@ -254,10 +306,10 @@ class ServerPool(object):
{'action': action,
'server': tuple((s.server,
s.port) for s in self.servers)})
return (0, None, None, None)
return first_response
def rest_action(self, action, resource, data='', errstr='%s',
ignore_codes=[], headers=None):
ignore_codes=[], headers={}, timeout=None):
"""
Wrapper for rest_call that verifies success and raises a
RemoteRestError on failure with a provided error string
@ -266,10 +318,11 @@ class ServerPool(object):
"""
if not ignore_codes and action == 'DELETE':
ignore_codes = [404]
resp = self.rest_call(action, resource, data, headers, ignore_codes)
resp = self.rest_call(action, resource, data, headers, ignore_codes,
timeout)
if self.server_failure(resp, ignore_codes):
LOG.error(errstr, resp[2])
raise RemoteRestError(reason=resp[2])
raise RemoteRestError(reason=resp[2], status=resp[0])
if resp[0] in ignore_codes:
LOG.warning(_("NeutronRestProxyV2: Received and ignored error "
"code %(code)s on %(action)s action to resource "
@ -361,3 +414,16 @@ class ServerPool(object):
resource = FLOATINGIPS_PATH % (tenant_id, oldid)
errstr = _("Unable to delete floating IP: %s")
self.rest_action('DELETE', resource, errstr=errstr)
def _consistency_watchdog(self, polling_interval=60):
if 'consistency' not in self.get_capabilities():
LOG.warning(_("Backend server(s) do not support automated "
"consitency checks."))
return
while True:
# If consistency is supported, all we have to do is make any
# rest call and the consistency header will be added. If it
# doesn't match, the backend will return a synchronization error
# that will be handled by the rest_call.
time.sleep(polling_interval)
self.servers.rest_call('GET', HEALTH_PATH)

View File

@ -16,6 +16,8 @@
# under the License.
#
# @author: Sumit Naiksatam, sumitnaiksatam@gmail.com, Big Switch Networks, Inc.
# @author: Kevin Benton, Big Switch Networks, Inc.
import eventlet
from oslo.config import cfg
@ -25,7 +27,7 @@ from neutron.openstack.common import log
from neutron.plugins.bigswitch import config as pl_config
from neutron.plugins.bigswitch.db import porttracker_db
from neutron.plugins.bigswitch.plugin import NeutronRestProxyV2Base
from neutron.plugins.bigswitch.servermanager import ServerPool
from neutron.plugins.bigswitch import servermanager
from neutron.plugins.ml2 import driver_api as api
@ -51,7 +53,11 @@ class BigSwitchMechanismDriver(NeutronRestProxyV2Base,
self.native_bulk_support = False
# init network ctrl connections
self.servers = ServerPool(server_timeout)
self.servers = servermanager.ServerPool(server_timeout)
self.servers.get_topo_function = self._get_all_data
self.servers.get_topo_function_args = {'get_ports': True,
'get_floating_ips': False,
'get_routers': False}
self.segmentation_types = ', '.join(cfg.CONF.ml2.type_drivers)
LOG.debug(_("Initialization done"))
@ -102,6 +108,8 @@ class BigSwitchMechanismDriver(NeutronRestProxyV2Base,
prepped_port = self._map_state_and_status(prepped_port)
if (portbindings.HOST_ID not in prepped_port or
prepped_port[portbindings.HOST_ID] == ''):
LOG.warning(_("Ignoring port notification to controller because "
"of missing host ID."))
# in ML2, controller doesn't care about ports without
# the host_id set
return False

View File

@ -20,6 +20,7 @@
import json
from neutron.openstack.common import log as logging
from neutron.plugins.bigswitch import servermanager
LOG = logging.getLogger(__name__)
@ -35,13 +36,16 @@ class HTTPResponseMock():
def read(self):
return "{'status': '200 OK'}"
def getheader(self, header):
return None
class HTTPResponseMock404(HTTPResponseMock):
status = 404
reason = 'Not Found'
def read(self):
return "{'status': '404 Not Found'}"
return "{'status': '%s 404 Not Found'}" % servermanager.NXNETWORK
class HTTPResponseMock500(HTTPResponseMock):
@ -99,6 +103,13 @@ class HTTPConnectionMock(object):
pass
class HTTPConnectionMock404(HTTPConnectionMock):
def __init__(self, server, port, timeout):
self.response = HTTPResponseMock404(None)
self.broken = True
class HTTPConnectionMock500(HTTPConnectionMock):
def __init__(self, server, port, timeout):

View File

@ -29,8 +29,10 @@ RESTPROXY_PKG_PATH = 'neutron.plugins.bigswitch.plugin'
NOTIFIER = 'neutron.plugins.bigswitch.plugin.AgentNotifierApi'
CALLBACKS = 'neutron.plugins.bigswitch.plugin.RestProxyCallbacks'
CERTFETCH = 'neutron.plugins.bigswitch.servermanager.ServerPool._fetch_cert'
SERVER_MANAGER = 'neutron.plugins.bigswitch.servermanager'
HTTPCON = 'httplib.HTTPConnection'
SPAWN = 'eventlet.GreenPool.spawn_n'
CWATCH = SERVER_MANAGER + '.ServerPool._consistency_watchdog'
class BigSwitchTestBase(object):
@ -50,9 +52,11 @@ class BigSwitchTestBase(object):
self.plugin_notifier_p = mock.patch(NOTIFIER)
self.callbacks_p = mock.patch(CALLBACKS)
self.spawn_p = mock.patch(SPAWN)
self.watch_p = mock.patch(CWATCH)
self.addCleanup(mock.patch.stopall)
self.addCleanup(db.clear_db)
self.callbacks_p.start()
self.plugin_notifier_p.start()
self.httpPatch.start()
self.spawn_p.start()
self.watch_p.start()

View File

@ -15,7 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import patch
from contextlib import nested
import mock
from oslo.config import cfg
import webob.exc
@ -29,6 +30,8 @@ from neutron.tests.unit import test_api_v2
import neutron.tests.unit.test_db_plugin as test_plugin
import neutron.tests.unit.test_extension_allowedaddresspairs as test_addr_pair
patch = mock.patch
class BigSwitchProxyPluginV2TestCase(test_base.BigSwitchTestBase,
test_plugin.NeutronDbPluginV2TestCase):
@ -150,6 +153,27 @@ class TestBigSwitchProxyPortsV2(test_plugin.TestPortsV2,
res = req.get_response(self.api)
self.assertEqual(res.status_int, 200)
def test_create404_triggers_sync(self):
# allow async port thread for this patch
self.spawn_p.stop()
with nested(
self.subnet(),
patch('httplib.HTTPConnection', create=True,
new=fake_server.HTTPConnectionMock404),
patch(test_base.RESTPROXY_PKG_PATH
+ '.NeutronRestProxyV2._send_all_data')
) as (s, mock_http, mock_send_all):
with self.port(subnet=s, device_id='somedevid') as p:
# wait for the async port thread to finish
plugin = NeutronManager.get_plugin()
plugin.evpool.waitall()
call = mock.call(
send_routers=True, send_ports=True, send_floating_ips=True,
triggered_by_tenant=p['port']['tenant_id']
)
mock_send_all.assert_has_calls([call])
self.spawn_p.start()
class TestBigSwitchProxyPortsV2IVS(test_plugin.TestPortsV2,
BigSwitchProxyPluginV2TestCase,

View File

@ -15,9 +15,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from contextlib import nested
import mock
import webob.exc
from neutron.extensions import portbindings
from neutron.manager import NeutronManager
from neutron.plugins.bigswitch import servermanager
from neutron.plugins.ml2 import config as ml2_config
from neutron.plugins.ml2.drivers import type_vlan as vlan_config
import neutron.tests.unit.bigswitch.test_restproxy_plugin as trp
@ -27,6 +31,9 @@ from neutron.tests.unit import test_db_plugin
PHYS_NET = 'physnet1'
VLAN_START = 1000
VLAN_END = 1100
SERVER_POOL = 'neutron.plugins.bigswitch.servermanager.ServerPool'
DRIVER_MOD = 'neutron.plugins.ml2.drivers.mech_bigswitch.driver'
DRIVER = DRIVER_MOD + '.BigSwitchMechanismDriver'
class TestBigSwitchMechDriverBase(trp.BigSwitchProxyPluginV2TestCase):
@ -93,3 +100,26 @@ class TestBigSwitchMechDriverPortsV2(test_db_plugin.TestPortsV2,
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def test_create404_triggers_background_sync(self):
# allow the async background thread to run for this test
self.spawn_p.stop()
with nested(
mock.patch(SERVER_POOL + '.rest_create_port',
side_effect=servermanager.RemoteRestError(
reason=servermanager.NXNETWORK, status=404)),
mock.patch(DRIVER + '._send_all_data'),
self.port(**{'device_id': 'devid', 'binding:host_id': 'host'})
) as (mock_http, mock_send_all, p):
# wait for thread to finish
mm = NeutronManager.get_plugin().mechanism_manager
bigdriver = mm.mech_drivers['bigswitch'].obj
bigdriver.evpool.waitall()
mock_send_all.assert_has_calls([
mock.call(
send_routers=False, send_ports=True,
send_floating_ips=False,
triggered_by_tenant=p['port']['tenant_id']
)
])
self.spawn_p.start()