BigSwitch: Asynchronous rest calls for port create

Makes rest calls for port creation an async
operation so create_port calls immediately
return in a BUILD state.

Implements: blueprint bsn-port-async
Change-Id: Ib512a846fa878ec33205df08a3b2464b7ea0941a
This commit is contained in:
Kevin Benton 2014-02-18 22:18:47 +00:00
parent ac5a75eae8
commit 5779340446
6 changed files with 90 additions and 37 deletions

View File

@ -8,10 +8,10 @@
# server_auth : <username:password> (default: no auth) # server_auth : <username:password> (default: no auth)
# server_ssl : True | False (default: False) # server_ssl : True | False (default: False)
# sync_data : True | False (default: False) # sync_data : True | False (default: False)
# server_timeout : 10 (default: 10 seconds) # server_timeout : <int> (default: 10 seconds)
# neutron_id : <string> (default: neutron-<hostname>) # neutron_id : <string> (default: neutron-<hostname>)
# add_meta_server_route : True | False (default: True) # add_meta_server_route : True | False (default: True)
# # thread_pool_size : <int> (default: 4)
# A comma separated list of BigSwitch or Floodlight servers and port numbers. The plugin proxies the requests to the BigSwitch/Floodlight server, which performs the networking configuration. Note that only one server is needed per deployment, but you may wish to deploy multiple servers to support failover. # A comma separated list of BigSwitch or Floodlight servers and port numbers. The plugin proxies the requests to the BigSwitch/Floodlight server, which performs the networking configuration. Note that only one server is needed per deployment, but you may wish to deploy multiple servers to support failover.
servers=localhost:8080 servers=localhost:8080
@ -34,6 +34,9 @@ servers=localhost:8080
# Flag to decide if a route to the metadata server should be injected into the VM # Flag to decide if a route to the metadata server should be injected into the VM
# add_meta_server_route = True # add_meta_server_route = True
# Number of threads to use to handle large volumes of port creation requests
# thread_pool_size = 4
[nova] [nova]
# Specify the VIF_TYPE that will be controlled on the Nova compute instances # Specify the VIF_TYPE that will be controlled on the Nova compute instances
# options: ivs or ovs # options: ivs or ovs

View File

@ -47,6 +47,9 @@ restproxy_opts = [
cfg.IntOpt('server_timeout', default=10, cfg.IntOpt('server_timeout', default=10,
help=_("Maximum number of seconds to wait for proxy request " help=_("Maximum number of seconds to wait for proxy request "
"to connect and complete.")), "to connect and complete.")),
cfg.IntOpt('thread_pool_size', default=4,
help=_("Maximum number of threads to spawn to handle large "
"volumes of port creations.")),
cfg.StrOpt('neutron_id', default='neutron-' + utils.get_hostname(), cfg.StrOpt('neutron_id', default='neutron-' + utils.get_hostname(),
deprecated_name='quantum_id', deprecated_name='quantum_id',
help=_("User defined identifier for this Neutron deployment")), help=_("User defined identifier for this Neutron deployment")),

View File

@ -47,7 +47,9 @@ on port-attach) on an additional PUT to do a bulk dump of all persistent data.
import copy import copy
import re import re
import eventlet
from oslo.config import cfg from oslo.config import cfg
from sqlalchemy.orm import exc as sqlexc
from neutron.agent import securitygroups_rpc as sg_rpc from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api import extensions as neutron_extensions from neutron.api import extensions as neutron_extensions
@ -86,7 +88,6 @@ from neutron.plugins.bigswitch.version import version_string_with_vcs
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
SYNTAX_ERROR_MESSAGE = _('Syntax error in server config file, aborting plugin') SYNTAX_ERROR_MESSAGE = _('Syntax error in server config file, aborting plugin')
METADATA_SERVER_IP = '169.254.169.254' METADATA_SERVER_IP = '169.254.169.254'
@ -379,6 +380,38 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
self).get_network(context, port["network_id"]) self).get_network(context, port["network_id"])
return net['tenant_id'] return net['tenant_id']
def async_port_create(self, tenant_id, net_id, port):
try:
self.servers.rest_create_port(tenant_id, net_id, port)
except servermanager.RemoteRestError as e:
LOG.error(
_("NeutronRestProxyV2: Unable to create port: %s"), e)
try:
self._set_port_status(port['id'], const.PORT_STATUS_ERROR)
except exceptions.PortNotFound:
# If port is already gone from DB and there was an error
# creating on the backend, everything is already consistent
pass
return
new_status = (const.PORT_STATUS_ACTIVE if port['state'] == 'UP'
else const.PORT_STATUS_DOWN)
try:
self._set_port_status(port['id'], new_status)
except exceptions.PortNotFound:
# This port was deleted before the create made it to the controller
# so it now needs to be deleted since the normal delete request
# would have deleted an non-existent port.
self.servers.rest_delete_port(tenant_id, net_id, port['id'])
def _set_port_status(self, port_id, status):
session = db.get_session()
try:
port = session.query(models_v2.Port).filter_by(id=port_id).one()
port['status'] = status
session.flush()
except sqlexc.NoResultFound:
raise exceptions.PortNotFound(port_id=port_id)
class NeutronRestProxyV2(NeutronRestProxyV2Base, class NeutronRestProxyV2(NeutronRestProxyV2Base,
extradhcpopt_db.ExtraDhcpOptMixin, extradhcpopt_db.ExtraDhcpOptMixin,
@ -403,6 +436,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
LOG.info(_('NeutronRestProxy: Starting plugin. Version=%s'), LOG.info(_('NeutronRestProxy: Starting plugin. Version=%s'),
version_string_with_vcs()) version_string_with_vcs())
pl_config.register_config() pl_config.register_config()
self.evpool = eventlet.GreenPool(cfg.CONF.RESTPROXY.thread_pool_size)
# Include the BigSwitch Extensions path in the api_extensions # Include the BigSwitch Extensions path in the api_extensions
neutron_extensions.append_api_extensions_path(extensions.__path__) neutron_extensions.append_api_extensions_path(extensions.__path__)
@ -584,29 +618,31 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
with context.session.begin(subtransactions=True): with context.session.begin(subtransactions=True):
self._ensure_default_security_group_on_port(context, port) self._ensure_default_security_group_on_port(context, port)
sgids = self._get_security_groups_on_port(context, port) sgids = self._get_security_groups_on_port(context, port)
# set port status to pending. updated after rest call completes
port['port']['status'] = const.PORT_STATUS_BUILD
dhcp_opts = port['port'].get(edo_ext.EXTRADHCPOPTS, []) dhcp_opts = port['port'].get(edo_ext.EXTRADHCPOPTS, [])
new_port = super(NeutronRestProxyV2, self).create_port(context, new_port = super(NeutronRestProxyV2, self).create_port(context,
port) port)
if (portbindings.HOST_ID in port['port']
and 'id' in new_port):
host_id = port['port'][portbindings.HOST_ID]
porttracker_db.put_port_hostid(context, new_port['id'],
host_id)
self._process_port_create_extra_dhcp_opts(context, new_port,
dhcp_opts)
new_port = self._extend_port_dict_binding(context, new_port)
net_tenant_id = self._get_port_net_tenantid(context, new_port)
if self.add_meta_server_route:
if new_port['device_owner'] == 'network:dhcp':
destination = METADATA_SERVER_IP + '/32'
self._add_host_route(context, destination, new_port)
# create on network ctrl
mapped_port = self._map_state_and_status(new_port)
self.servers.rest_create_port(net_tenant_id,
new_port["network_id"],
mapped_port)
self._process_port_create_security_group(context, new_port, sgids) self._process_port_create_security_group(context, new_port, sgids)
if (portbindings.HOST_ID in port['port']
and 'id' in new_port):
host_id = port['port'][portbindings.HOST_ID]
porttracker_db.put_port_hostid(context, new_port['id'],
host_id)
self._process_port_create_extra_dhcp_opts(context, new_port,
dhcp_opts)
new_port = self._extend_port_dict_binding(context, new_port)
net = super(NeutronRestProxyV2,
self).get_network(context, new_port["network_id"])
if self.add_meta_server_route:
if new_port['device_owner'] == 'network:dhcp':
destination = METADATA_SERVER_IP + '/32'
self._add_host_route(context, destination, new_port)
# create on network ctrl
mapped_port = self._map_state_and_status(new_port)
self.evpool.spawn_n(self.async_port_create, net["tenant_id"],
new_port["network_id"], mapped_port)
self.notify_security_groups_member_updated(context, new_port) self.notify_security_groups_member_updated(context, new_port)
return new_port return new_port

View File

@ -16,7 +16,7 @@
# under the License. # under the License.
# #
# @author: Sumit Naiksatam, sumitnaiksatam@gmail.com, Big Switch Networks, Inc. # @author: Sumit Naiksatam, sumitnaiksatam@gmail.com, Big Switch Networks, Inc.
import eventlet
from oslo.config import cfg from oslo.config import cfg
from neutron import context as ctx from neutron import context as ctx
@ -46,6 +46,7 @@ class BigSwitchMechanismDriver(NeutronRestProxyV2Base,
# register plugin config opts # register plugin config opts
pl_config.register_config() pl_config.register_config()
self.evpool = eventlet.GreenPool(cfg.CONF.RESTPROXY.thread_pool_size)
# backend doesn't support bulk operations yet # backend doesn't support bulk operations yet
self.native_bulk_support = False self.native_bulk_support = False
@ -70,8 +71,8 @@ class BigSwitchMechanismDriver(NeutronRestProxyV2Base,
# create port on the network controller # create port on the network controller
port = self._prepare_port_for_controller(context) port = self._prepare_port_for_controller(context)
if port: if port:
self.servers.rest_create_port(port["network"]["tenant_id"], self.async_port_create(port["network"]["tenant_id"],
port["network"]["id"], port) port["network"]["id"], port)
def update_port_postcommit(self, context): def update_port_postcommit(self, context):
# update port on the network controller # update port on the network controller

View File

@ -30,6 +30,7 @@ NOTIFIER = 'neutron.plugins.bigswitch.plugin.AgentNotifierApi'
CALLBACKS = 'neutron.plugins.bigswitch.plugin.RestProxyCallbacks' CALLBACKS = 'neutron.plugins.bigswitch.plugin.RestProxyCallbacks'
CERTFETCH = 'neutron.plugins.bigswitch.servermanager.ServerPool._fetch_cert' CERTFETCH = 'neutron.plugins.bigswitch.servermanager.ServerPool._fetch_cert'
HTTPCON = 'httplib.HTTPConnection' HTTPCON = 'httplib.HTTPConnection'
SPAWN = 'eventlet.GreenPool.spawn_n'
class BigSwitchTestBase(object): class BigSwitchTestBase(object):
@ -48,8 +49,10 @@ class BigSwitchTestBase(object):
new=fake_server.HTTPConnectionMock) new=fake_server.HTTPConnectionMock)
self.plugin_notifier_p = mock.patch(NOTIFIER) self.plugin_notifier_p = mock.patch(NOTIFIER)
self.callbacks_p = mock.patch(CALLBACKS) self.callbacks_p = mock.patch(CALLBACKS)
self.spawn_p = mock.patch(SPAWN)
self.addCleanup(mock.patch.stopall) self.addCleanup(mock.patch.stopall)
self.addCleanup(db.clear_db) self.addCleanup(db.clear_db)
self.callbacks_p.start() self.callbacks_p.start()
self.plugin_notifier_p.start() self.plugin_notifier_p.start()
self.httpPatch.start() self.httpPatch.start()
self.spawn_p.start()

View File

@ -39,6 +39,7 @@ class BigSwitchProxyPluginV2TestCase(test_base.BigSwitchTestBase,
self._plugin_name = plugin_name self._plugin_name = plugin_name
super(BigSwitchProxyPluginV2TestCase, super(BigSwitchProxyPluginV2TestCase,
self).setUp(self._plugin_name) self).setUp(self._plugin_name)
self.port_create_status = 'BUILD'
class TestBigSwitchProxyBasicGet(test_plugin.TestBasicGet, class TestBigSwitchProxyBasicGet(test_plugin.TestBasicGet,
@ -67,25 +68,31 @@ class TestBigSwitchProxyPortsV2(test_plugin.TestPortsV2,
VIF_TYPE = portbindings.VIF_TYPE_OVS VIF_TYPE = portbindings.VIF_TYPE_OVS
HAS_PORT_FILTER = False HAS_PORT_FILTER = False
def test_update_port_status_build(self):
with self.port() as port:
self.assertEqual(port['port']['status'], 'BUILD')
self.assertEqual(self.port_create_status, 'BUILD')
def _get_ports(self, netid): def _get_ports(self, netid):
return self.deserialize('json', return self.deserialize('json',
self._list_ports('json', netid=netid))['ports'] self._list_ports('json', netid=netid))['ports']
def test_rollback_for_port_create(self): def test_rollback_for_port_create(self):
with self.network(no_delete=True) as n: plugin = NeutronManager.get_plugin()
with self.subnet() as s:
self.httpPatch = patch('httplib.HTTPConnection', create=True, self.httpPatch = patch('httplib.HTTPConnection', create=True,
new=fake_server.HTTPConnectionMock500) new=fake_server.HTTPConnectionMock500)
self.httpPatch.start() self.httpPatch.start()
kwargs = {'device_id': 'somedevid', kwargs = {'device_id': 'somedevid'}
'tenant_id': n['network']['tenant_id']} # allow thread spawns for this patch
self._create_port('json', n['network']['id'], self.spawn_p.stop()
expected_code= with self.port(subnet=s, **kwargs):
webob.exc.HTTPInternalServerError.code, self.spawn_p.start()
**kwargs) plugin.evpool.waitall()
self.httpPatch.stop() self.httpPatch.stop()
ports = self._get_ports(n['network']['id']) ports = self._get_ports(s['subnet']['network_id'])
#failure to create should result in no ports #failure to create should result in port in error state
self.assertEqual(0, len(ports)) self.assertEqual(ports[0]['status'], 'ERROR')
def test_rollback_for_port_update(self): def test_rollback_for_port_update(self):
with self.network() as n: with self.network() as n:
@ -116,7 +123,7 @@ class TestBigSwitchProxyPortsV2(test_plugin.TestPortsV2,
webob.exc.HTTPInternalServerError.code) webob.exc.HTTPInternalServerError.code)
self.httpPatch.stop() self.httpPatch.stop()
port = self._get_ports(n['network']['id'])[0] port = self._get_ports(n['network']['id'])[0]
self.assertEqual('ACTIVE', port['status']) self.assertEqual('BUILD', port['status'])
def test_correct_shared_net_tenant_id(self): def test_correct_shared_net_tenant_id(self):
# tenant_id in port requests should match network tenant_id instead # tenant_id in port requests should match network tenant_id instead