BigSwitch: Auto re-sync on backend inconsistencies
If the controller supports it, pass a hash to the controller indicating the expected state that a REST transaction is updating. If the state is inconsistent, the controller will return an error indicating a conflict and the plugin/driver will trigger a full synchronization. For controllers that don't support the consistency hash, trigger a full background synchronization if the plugin tries to create a port and receives a 404 error due to the parent network not existing. Implements: blueprint bsn-auto-resync Change-Id: I07c92b011453f6bf81b8ee12661170817287cdd7
This commit is contained in:
parent
fd8e37e221
commit
095aa20e3e
@ -8,7 +8,8 @@
|
|||||||
# server_auth : <username:password> (default: no auth)
|
# server_auth : <username:password> (default: no auth)
|
||||||
# server_ssl : True | False (default: False)
|
# server_ssl : True | False (default: False)
|
||||||
# sync_data : True | False (default: False)
|
# sync_data : True | False (default: False)
|
||||||
# server_timeout : <int> (default: 10 seconds)
|
# auto_sync_on_failure : True | False (default: True)
|
||||||
|
# server_timeout : <integer> (default: 10 seconds)
|
||||||
# neutron_id : <string> (default: neutron-<hostname>)
|
# neutron_id : <string> (default: neutron-<hostname>)
|
||||||
# add_meta_server_route : True | False (default: True)
|
# add_meta_server_route : True | False (default: True)
|
||||||
# thread_pool_size : <int> (default: 4)
|
# thread_pool_size : <int> (default: 4)
|
||||||
@ -25,6 +26,11 @@ servers=localhost:8080
|
|||||||
# Sync data on connect
|
# Sync data on connect
|
||||||
# sync_data=False
|
# sync_data=False
|
||||||
|
|
||||||
|
# If neutron fails to create a resource because the backend controller
|
||||||
|
# doesn't know of a dependency, automatically trigger a full data
|
||||||
|
# synchronization to the controller.
|
||||||
|
# auto_sync_on_failure=True
|
||||||
|
|
||||||
# Maximum number of seconds to wait for proxy request to connect and complete.
|
# Maximum number of seconds to wait for proxy request to connect and complete.
|
||||||
# server_timeout=10
|
# server_timeout=10
|
||||||
|
|
||||||
|
@ -0,0 +1,55 @@
|
|||||||
|
# Copyright 2014 OpenStack Foundation
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
"""bsn_consistencyhashes
|
||||||
|
|
||||||
|
Revision ID: 81c553f3776c
|
||||||
|
Revises: 24c7ea5160d7
|
||||||
|
Create Date: 2014-02-26 18:56:00.402855
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '81c553f3776c'
|
||||||
|
down_revision = '24c7ea5160d7'
|
||||||
|
|
||||||
|
# Change to ['*'] if this migration applies to all plugins
|
||||||
|
|
||||||
|
migration_for_plugins = [
|
||||||
|
'neutron.plugins.bigswitch.plugin.NeutronRestProxyV2'
|
||||||
|
]
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
from neutron.db import migration
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade(active_plugins=None, options=None):
|
||||||
|
if not migration.should_run(active_plugins, migration_for_plugins):
|
||||||
|
return
|
||||||
|
|
||||||
|
op.create_table(
|
||||||
|
'consistencyhashes',
|
||||||
|
sa.Column('hash_id', sa.String(255), primary_key=True),
|
||||||
|
sa.Column('hash', sa.String(255), nullable=False)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade(active_plugins=None, options=None):
|
||||||
|
if not migration.should_run(active_plugins, migration_for_plugins):
|
||||||
|
return
|
||||||
|
|
||||||
|
op.drop_table('consistencyhashes')
|
@ -44,6 +44,14 @@ restproxy_opts = [
|
|||||||
"Floodlight controller.")),
|
"Floodlight controller.")),
|
||||||
cfg.BoolOpt('sync_data', default=False,
|
cfg.BoolOpt('sync_data', default=False,
|
||||||
help=_("Sync data on connect")),
|
help=_("Sync data on connect")),
|
||||||
|
cfg.BoolOpt('auto_sync_on_failure', default=True,
|
||||||
|
help=_("If neutron fails to create a resource because "
|
||||||
|
"the backend controller doesn't know of a dependency, "
|
||||||
|
"automatically trigger a full data synchronization "
|
||||||
|
"to the controller.")),
|
||||||
|
cfg.IntOpt('consistency_interval', default=60,
|
||||||
|
help=_("Time between verifications that the backend controller "
|
||||||
|
"database is consistent with Neutron")),
|
||||||
cfg.IntOpt('server_timeout', default=10,
|
cfg.IntOpt('server_timeout', default=10,
|
||||||
help=_("Maximum number of seconds to wait for proxy request "
|
help=_("Maximum number of seconds to wait for proxy request "
|
||||||
"to connect and complete.")),
|
"to connect and complete.")),
|
||||||
|
56
neutron/plugins/bigswitch/db/consistency_db.py
Normal file
56
neutron/plugins/bigswitch/db/consistency_db.py
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
# Copyright 2014, Big Switch Networks
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
from neutron.db import api as db
|
||||||
|
from neutron.db import model_base
|
||||||
|
from neutron.openstack.common import log as logging
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
'''
|
||||||
|
A simple table to store the latest consistency hash
|
||||||
|
received from a server in case neutron gets restarted.
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
|
class ConsistencyHash(model_base.BASEV2):
|
||||||
|
'''
|
||||||
|
For now we only support one global state so the
|
||||||
|
hash_id will always be '1'
|
||||||
|
'''
|
||||||
|
__tablename__ = 'consistencyhashes'
|
||||||
|
hash_id = sa.Column(sa.String(255),
|
||||||
|
primary_key=True)
|
||||||
|
hash = sa.Column(sa.String(255), nullable=False)
|
||||||
|
|
||||||
|
|
||||||
|
def get_consistency_hash(hash_id='1'):
|
||||||
|
session = db.get_session()
|
||||||
|
with session.begin(subtransactions=True):
|
||||||
|
query = session.query(ConsistencyHash)
|
||||||
|
res = query.filter_by(hash_id=hash_id).first()
|
||||||
|
if not res:
|
||||||
|
return False
|
||||||
|
return res.hash
|
||||||
|
|
||||||
|
|
||||||
|
def put_consistency_hash(hash, hash_id='1'):
|
||||||
|
session = db.get_session()
|
||||||
|
with session.begin(subtransactions=True):
|
||||||
|
conhash = ConsistencyHash(hash_id=hash_id, hash=hash)
|
||||||
|
session.merge(conhash)
|
||||||
|
LOG.debug(_("Consistency hash for group %(hash_id)s updated "
|
||||||
|
"to %(hash)s"), {'hash_id': hash_id, 'hash': hash})
|
@ -45,6 +45,7 @@ on port-attach) on an additional PUT to do a bulk dump of all persistent data.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import copy
|
import copy
|
||||||
|
import httplib
|
||||||
import re
|
import re
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
@ -172,13 +173,8 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
if not self.servers:
|
if not self.servers:
|
||||||
LOG.warning(_("ServerPool not set!"))
|
LOG.warning(_("ServerPool not set!"))
|
||||||
|
|
||||||
def _send_all_data(self, send_ports=True, send_floating_ips=True,
|
def _get_all_data(self, get_ports=True, get_floating_ips=True,
|
||||||
send_routers=True):
|
get_routers=True):
|
||||||
"""Pushes all data to network ctrl (networks/ports, ports/attachments).
|
|
||||||
|
|
||||||
This gives the controller an option to re-sync it's persistent store
|
|
||||||
with neutron's current view of that data.
|
|
||||||
"""
|
|
||||||
admin_context = qcontext.get_admin_context()
|
admin_context = qcontext.get_admin_context()
|
||||||
networks = []
|
networks = []
|
||||||
|
|
||||||
@ -186,11 +182,11 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
for net in all_networks:
|
for net in all_networks:
|
||||||
mapped_network = self._get_mapped_network_with_subnets(net)
|
mapped_network = self._get_mapped_network_with_subnets(net)
|
||||||
flips_n_ports = {}
|
flips_n_ports = {}
|
||||||
if send_floating_ips:
|
if get_floating_ips:
|
||||||
flips_n_ports = self._get_network_with_floatingips(
|
flips_n_ports = self._get_network_with_floatingips(
|
||||||
mapped_network)
|
mapped_network)
|
||||||
|
|
||||||
if send_ports:
|
if get_ports:
|
||||||
ports = []
|
ports = []
|
||||||
net_filter = {'network_id': [net.get('id')]}
|
net_filter = {'network_id': [net.get('id')]}
|
||||||
net_ports = self.get_ports(admin_context,
|
net_ports = self.get_ports(admin_context,
|
||||||
@ -209,12 +205,9 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
if flips_n_ports:
|
if flips_n_ports:
|
||||||
networks.append(flips_n_ports)
|
networks.append(flips_n_ports)
|
||||||
|
|
||||||
resource = '/topology'
|
data = {'networks': networks}
|
||||||
data = {
|
|
||||||
'networks': networks,
|
|
||||||
}
|
|
||||||
|
|
||||||
if send_routers:
|
if get_routers:
|
||||||
routers = []
|
routers = []
|
||||||
all_routers = self.get_routers(admin_context) or []
|
all_routers = self.get_routers(admin_context) or []
|
||||||
for router in all_routers:
|
for router in all_routers:
|
||||||
@ -238,9 +231,21 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
routers.append(mapped_router)
|
routers.append(mapped_router)
|
||||||
|
|
||||||
data.update({'routers': routers})
|
data.update({'routers': routers})
|
||||||
|
return data
|
||||||
|
|
||||||
|
def _send_all_data(self, send_ports=True, send_floating_ips=True,
|
||||||
|
send_routers=True, timeout=None,
|
||||||
|
triggered_by_tenant=None):
|
||||||
|
"""Pushes all data to network ctrl (networks/ports, ports/attachments).
|
||||||
|
|
||||||
|
This gives the controller an option to re-sync it's persistent store
|
||||||
|
with neutron's current view of that data.
|
||||||
|
"""
|
||||||
|
data = self._get_all_data(send_ports, send_floating_ips, send_routers)
|
||||||
|
data['triggered_by_tenant'] = triggered_by_tenant
|
||||||
errstr = _("Unable to update remote topology: %s")
|
errstr = _("Unable to update remote topology: %s")
|
||||||
return self.servers.rest_action('PUT', resource, data, errstr)
|
return self.servers.rest_action('PUT', servermanager.TOPOLOGY_PATH,
|
||||||
|
data, errstr, timeout=timeout)
|
||||||
|
|
||||||
def _get_network_with_floatingips(self, network, context=None):
|
def _get_network_with_floatingips(self, network, context=None):
|
||||||
if context is None:
|
if context is None:
|
||||||
@ -386,15 +391,38 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
try:
|
try:
|
||||||
self.servers.rest_create_port(tenant_id, net_id, port)
|
self.servers.rest_create_port(tenant_id, net_id, port)
|
||||||
except servermanager.RemoteRestError as e:
|
except servermanager.RemoteRestError as e:
|
||||||
LOG.error(
|
# 404 should never be received on a port create unless
|
||||||
_("NeutronRestProxyV2: Unable to create port: %s"), e)
|
# there are inconsistencies between the data in neutron
|
||||||
try:
|
# and the data in the backend.
|
||||||
self._set_port_status(port['id'], const.PORT_STATUS_ERROR)
|
# Run a sync to get it consistent.
|
||||||
except exceptions.PortNotFound:
|
if (cfg.CONF.RESTPROXY.auto_sync_on_failure and
|
||||||
# If port is already gone from DB and there was an error
|
e.status == httplib.NOT_FOUND and
|
||||||
# creating on the backend, everything is already consistent
|
servermanager.NXNETWORK in e.reason):
|
||||||
pass
|
LOG.error(_("Iconsistency with backend controller "
|
||||||
return
|
"triggering full synchronization."))
|
||||||
|
# args depend on if we are operating in ML2 driver
|
||||||
|
# or as the full plugin
|
||||||
|
topoargs = self.servers.get_topo_function_args
|
||||||
|
self._send_all_data(
|
||||||
|
send_ports=topoargs['get_ports'],
|
||||||
|
send_floating_ips=topoargs['get_floating_ips'],
|
||||||
|
send_routers=topoargs['get_routers'],
|
||||||
|
triggered_by_tenant=tenant_id
|
||||||
|
)
|
||||||
|
# If the full sync worked, the port will be created
|
||||||
|
# on the controller so it can be safely marked as active
|
||||||
|
else:
|
||||||
|
# Any errors that don't result in a successful auto-sync
|
||||||
|
# require that the port be placed into the error state.
|
||||||
|
LOG.error(
|
||||||
|
_("NeutronRestProxyV2: Unable to create port: %s"), e)
|
||||||
|
try:
|
||||||
|
self._set_port_status(port['id'], const.PORT_STATUS_ERROR)
|
||||||
|
except exceptions.PortNotFound:
|
||||||
|
# If port is already gone from DB and there was an error
|
||||||
|
# creating on the backend, everything is already consistent
|
||||||
|
pass
|
||||||
|
return
|
||||||
new_status = (const.PORT_STATUS_ACTIVE if port['state'] == 'UP'
|
new_status = (const.PORT_STATUS_ACTIVE if port['state'] == 'UP'
|
||||||
else const.PORT_STATUS_DOWN)
|
else const.PORT_STATUS_DOWN)
|
||||||
try:
|
try:
|
||||||
@ -448,6 +476,10 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
|
|||||||
|
|
||||||
# init network ctrl connections
|
# init network ctrl connections
|
||||||
self.servers = servermanager.ServerPool(server_timeout)
|
self.servers = servermanager.ServerPool(server_timeout)
|
||||||
|
self.servers.get_topo_function = self._get_all_data
|
||||||
|
self.servers.get_topo_function_args = {'get_ports': True,
|
||||||
|
'get_floating_ips': True,
|
||||||
|
'get_routers': True}
|
||||||
|
|
||||||
self.network_scheduler = importutils.import_object(
|
self.network_scheduler = importutils.import_object(
|
||||||
cfg.CONF.network_scheduler_driver
|
cfg.CONF.network_scheduler_driver
|
||||||
|
@ -34,13 +34,15 @@ import base64
|
|||||||
import httplib
|
import httplib
|
||||||
import json
|
import json
|
||||||
import socket
|
import socket
|
||||||
|
import time
|
||||||
|
|
||||||
|
import eventlet
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from neutron.common import exceptions
|
from neutron.common import exceptions
|
||||||
from neutron.common import utils
|
from neutron.common import utils
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
|
from neutron.plugins.bigswitch.db import consistency_db as cdb
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -56,23 +58,34 @@ PORTS_PATH = "/tenants/%s/networks/%s/ports/%s"
|
|||||||
ATTACHMENT_PATH = "/tenants/%s/networks/%s/ports/%s/attachment"
|
ATTACHMENT_PATH = "/tenants/%s/networks/%s/ports/%s/attachment"
|
||||||
ROUTERS_PATH = "/tenants/%s/routers/%s"
|
ROUTERS_PATH = "/tenants/%s/routers/%s"
|
||||||
ROUTER_INTF_PATH = "/tenants/%s/routers/%s/interfaces/%s"
|
ROUTER_INTF_PATH = "/tenants/%s/routers/%s/interfaces/%s"
|
||||||
|
TOPOLOGY_PATH = "/topology"
|
||||||
|
HEALTH_PATH = "/health"
|
||||||
SUCCESS_CODES = range(200, 207)
|
SUCCESS_CODES = range(200, 207)
|
||||||
FAILURE_CODES = [0, 301, 302, 303, 400, 401, 403, 404, 500, 501, 502, 503,
|
FAILURE_CODES = [0, 301, 302, 303, 400, 401, 403, 404, 500, 501, 502, 503,
|
||||||
504, 505]
|
504, 505]
|
||||||
BASE_URI = '/networkService/v1.1'
|
BASE_URI = '/networkService/v1.1'
|
||||||
ORCHESTRATION_SERVICE_ID = 'Neutron v2.0'
|
ORCHESTRATION_SERVICE_ID = 'Neutron v2.0'
|
||||||
|
HASH_MATCH_HEADER = 'X-BSN-BVS-HASH-MATCH'
|
||||||
|
# error messages
|
||||||
|
NXNETWORK = 'NXVNS'
|
||||||
|
|
||||||
|
|
||||||
class RemoteRestError(exceptions.NeutronException):
|
class RemoteRestError(exceptions.NeutronException):
|
||||||
message = _("Error in REST call to remote network "
|
message = _("Error in REST call to remote network "
|
||||||
"controller: %(reason)s")
|
"controller: %(reason)s")
|
||||||
|
status = None
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
self.status = kwargs.pop('status', None)
|
||||||
|
self.reason = kwargs.get('reason')
|
||||||
|
super(RemoteRestError, self).__init__(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
class ServerProxy(object):
|
class ServerProxy(object):
|
||||||
"""REST server proxy to a network controller."""
|
"""REST server proxy to a network controller."""
|
||||||
|
|
||||||
def __init__(self, server, port, ssl, auth, neutron_id, timeout,
|
def __init__(self, server, port, ssl, auth, neutron_id, timeout,
|
||||||
base_uri, name):
|
base_uri, name, mypool):
|
||||||
self.server = server
|
self.server = server
|
||||||
self.port = port
|
self.port = port
|
||||||
self.ssl = ssl
|
self.ssl = ssl
|
||||||
@ -84,6 +97,8 @@ class ServerProxy(object):
|
|||||||
self.neutron_id = neutron_id
|
self.neutron_id = neutron_id
|
||||||
self.failed = False
|
self.failed = False
|
||||||
self.capabilities = []
|
self.capabilities = []
|
||||||
|
# enable server to reference parent pool
|
||||||
|
self.mypool = mypool
|
||||||
if auth:
|
if auth:
|
||||||
self.auth = 'Basic ' + base64.encodestring(auth).strip()
|
self.auth = 'Basic ' + base64.encodestring(auth).strip()
|
||||||
|
|
||||||
@ -99,7 +114,7 @@ class ServerProxy(object):
|
|||||||
'cap': self.capabilities})
|
'cap': self.capabilities})
|
||||||
return self.capabilities
|
return self.capabilities
|
||||||
|
|
||||||
def rest_call(self, action, resource, data='', headers=None):
|
def rest_call(self, action, resource, data='', headers={}, timeout=None):
|
||||||
uri = self.base_uri + resource
|
uri = self.base_uri + resource
|
||||||
body = json.dumps(data)
|
body = json.dumps(data)
|
||||||
if not headers:
|
if not headers:
|
||||||
@ -109,6 +124,7 @@ class ServerProxy(object):
|
|||||||
headers['NeutronProxy-Agent'] = self.name
|
headers['NeutronProxy-Agent'] = self.name
|
||||||
headers['Instance-ID'] = self.neutron_id
|
headers['Instance-ID'] = self.neutron_id
|
||||||
headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID
|
headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID
|
||||||
|
headers[HASH_MATCH_HEADER] = self.mypool.consistency_hash
|
||||||
if self.auth:
|
if self.auth:
|
||||||
headers['Authorization'] = self.auth
|
headers['Authorization'] = self.auth
|
||||||
|
|
||||||
@ -121,16 +137,17 @@ class ServerProxy(object):
|
|||||||
'action': action})
|
'action': action})
|
||||||
|
|
||||||
conn = None
|
conn = None
|
||||||
|
timeout = timeout or self.timeout
|
||||||
if self.ssl:
|
if self.ssl:
|
||||||
conn = httplib.HTTPSConnection(
|
conn = httplib.HTTPSConnection(
|
||||||
self.server, self.port, timeout=self.timeout)
|
self.server, self.port, timeout=timeout)
|
||||||
if conn is None:
|
if conn is None:
|
||||||
LOG.error(_('ServerProxy: Could not establish HTTPS '
|
LOG.error(_('ServerProxy: Could not establish HTTPS '
|
||||||
'connection'))
|
'connection'))
|
||||||
return 0, None, None, None
|
return 0, None, None, None
|
||||||
else:
|
else:
|
||||||
conn = httplib.HTTPConnection(
|
conn = httplib.HTTPConnection(
|
||||||
self.server, self.port, timeout=self.timeout)
|
self.server, self.port, timeout=timeout)
|
||||||
if conn is None:
|
if conn is None:
|
||||||
LOG.error(_('ServerProxy: Could not establish HTTP '
|
LOG.error(_('ServerProxy: Could not establish HTTP '
|
||||||
'connection'))
|
'connection'))
|
||||||
@ -139,6 +156,9 @@ class ServerProxy(object):
|
|||||||
try:
|
try:
|
||||||
conn.request(action, uri, body, headers)
|
conn.request(action, uri, body, headers)
|
||||||
response = conn.getresponse()
|
response = conn.getresponse()
|
||||||
|
newhash = response.getheader(HASH_MATCH_HEADER)
|
||||||
|
if newhash:
|
||||||
|
self._put_consistency_hash(newhash)
|
||||||
respstr = response.read()
|
respstr = response.read()
|
||||||
respdata = respstr
|
respdata = respstr
|
||||||
if response.status in self.success_codes:
|
if response.status in self.success_codes:
|
||||||
@ -160,6 +180,10 @@ class ServerProxy(object):
|
|||||||
'data': ret[3]})
|
'data': ret[3]})
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
def _put_consistency_hash(self, newhash):
|
||||||
|
self.mypool.consistency_hash = newhash
|
||||||
|
cdb.put_consistency_hash(newhash)
|
||||||
|
|
||||||
|
|
||||||
class ServerPool(object):
|
class ServerPool(object):
|
||||||
|
|
||||||
@ -180,6 +204,17 @@ class ServerPool(object):
|
|||||||
if timeout is not None:
|
if timeout is not None:
|
||||||
self.timeout = timeout
|
self.timeout = timeout
|
||||||
|
|
||||||
|
# Function to use to retrieve topology for consistency syncs.
|
||||||
|
# Needs to be set by module that uses the servermanager.
|
||||||
|
self.get_topo_function = None
|
||||||
|
self.get_topo_function_args = {}
|
||||||
|
|
||||||
|
# Hash to send to backend with request as expected previous
|
||||||
|
# state to verify consistency.
|
||||||
|
self.consistency_hash = cdb.get_consistency_hash()
|
||||||
|
eventlet.spawn(self._consistency_watchdog,
|
||||||
|
cfg.CONF.RESTPROXY.consistency_interval)
|
||||||
|
|
||||||
if not servers:
|
if not servers:
|
||||||
raise cfg.Error(_('Servers not defined. Aborting server manager.'))
|
raise cfg.Error(_('Servers not defined. Aborting server manager.'))
|
||||||
servers = [s if len(s.rsplit(':', 1)) == 2
|
servers = [s if len(s.rsplit(':', 1)) == 2
|
||||||
@ -210,7 +245,7 @@ class ServerPool(object):
|
|||||||
|
|
||||||
def server_proxy_for(self, server, port):
|
def server_proxy_for(self, server, port):
|
||||||
return ServerProxy(server, port, self.ssl, self.auth, self.neutron_id,
|
return ServerProxy(server, port, self.ssl, self.auth, self.neutron_id,
|
||||||
self.timeout, self.base_uri, self.name)
|
self.timeout, self.base_uri, self.name, mypool=self)
|
||||||
|
|
||||||
def server_failure(self, resp, ignore_codes=[]):
|
def server_failure(self, resp, ignore_codes=[]):
|
||||||
"""Define failure codes as required.
|
"""Define failure codes as required.
|
||||||
@ -228,10 +263,27 @@ class ServerPool(object):
|
|||||||
return resp[0] in SUCCESS_CODES
|
return resp[0] in SUCCESS_CODES
|
||||||
|
|
||||||
@utils.synchronized('bsn-rest-call')
|
@utils.synchronized('bsn-rest-call')
|
||||||
def rest_call(self, action, resource, data, headers, ignore_codes):
|
def rest_call(self, action, resource, data, headers, ignore_codes,
|
||||||
|
timeout=None):
|
||||||
good_first = sorted(self.servers, key=lambda x: x.failed)
|
good_first = sorted(self.servers, key=lambda x: x.failed)
|
||||||
|
first_response = None
|
||||||
for active_server in good_first:
|
for active_server in good_first:
|
||||||
ret = active_server.rest_call(action, resource, data, headers)
|
ret = active_server.rest_call(action, resource, data, headers,
|
||||||
|
timeout)
|
||||||
|
# If inconsistent, do a full synchronization
|
||||||
|
if ret[0] == httplib.CONFLICT:
|
||||||
|
if not self.get_topo_function:
|
||||||
|
raise cfg.Error(_('Server requires synchronization, '
|
||||||
|
'but no topology function was defined.'))
|
||||||
|
data = self.get_topo_function(**self.get_topo_function_args)
|
||||||
|
active_server.rest_call('PUT', TOPOLOGY_PATH, data,
|
||||||
|
timeout=None)
|
||||||
|
# Store the first response as the error to be bubbled up to the
|
||||||
|
# user since it was a good server. Subsequent servers will most
|
||||||
|
# likely be cluster slaves and won't have a useful error for the
|
||||||
|
# user (e.g. 302 redirect to master)
|
||||||
|
if not first_response:
|
||||||
|
first_response = ret
|
||||||
if not self.server_failure(ret, ignore_codes):
|
if not self.server_failure(ret, ignore_codes):
|
||||||
active_server.failed = False
|
active_server.failed = False
|
||||||
return ret
|
return ret
|
||||||
@ -254,10 +306,10 @@ class ServerPool(object):
|
|||||||
{'action': action,
|
{'action': action,
|
||||||
'server': tuple((s.server,
|
'server': tuple((s.server,
|
||||||
s.port) for s in self.servers)})
|
s.port) for s in self.servers)})
|
||||||
return (0, None, None, None)
|
return first_response
|
||||||
|
|
||||||
def rest_action(self, action, resource, data='', errstr='%s',
|
def rest_action(self, action, resource, data='', errstr='%s',
|
||||||
ignore_codes=[], headers=None):
|
ignore_codes=[], headers={}, timeout=None):
|
||||||
"""
|
"""
|
||||||
Wrapper for rest_call that verifies success and raises a
|
Wrapper for rest_call that verifies success and raises a
|
||||||
RemoteRestError on failure with a provided error string
|
RemoteRestError on failure with a provided error string
|
||||||
@ -266,10 +318,11 @@ class ServerPool(object):
|
|||||||
"""
|
"""
|
||||||
if not ignore_codes and action == 'DELETE':
|
if not ignore_codes and action == 'DELETE':
|
||||||
ignore_codes = [404]
|
ignore_codes = [404]
|
||||||
resp = self.rest_call(action, resource, data, headers, ignore_codes)
|
resp = self.rest_call(action, resource, data, headers, ignore_codes,
|
||||||
|
timeout)
|
||||||
if self.server_failure(resp, ignore_codes):
|
if self.server_failure(resp, ignore_codes):
|
||||||
LOG.error(errstr, resp[2])
|
LOG.error(errstr, resp[2])
|
||||||
raise RemoteRestError(reason=resp[2])
|
raise RemoteRestError(reason=resp[2], status=resp[0])
|
||||||
if resp[0] in ignore_codes:
|
if resp[0] in ignore_codes:
|
||||||
LOG.warning(_("NeutronRestProxyV2: Received and ignored error "
|
LOG.warning(_("NeutronRestProxyV2: Received and ignored error "
|
||||||
"code %(code)s on %(action)s action to resource "
|
"code %(code)s on %(action)s action to resource "
|
||||||
@ -361,3 +414,16 @@ class ServerPool(object):
|
|||||||
resource = FLOATINGIPS_PATH % (tenant_id, oldid)
|
resource = FLOATINGIPS_PATH % (tenant_id, oldid)
|
||||||
errstr = _("Unable to delete floating IP: %s")
|
errstr = _("Unable to delete floating IP: %s")
|
||||||
self.rest_action('DELETE', resource, errstr=errstr)
|
self.rest_action('DELETE', resource, errstr=errstr)
|
||||||
|
|
||||||
|
def _consistency_watchdog(self, polling_interval=60):
|
||||||
|
if 'consistency' not in self.get_capabilities():
|
||||||
|
LOG.warning(_("Backend server(s) do not support automated "
|
||||||
|
"consitency checks."))
|
||||||
|
return
|
||||||
|
while True:
|
||||||
|
# If consistency is supported, all we have to do is make any
|
||||||
|
# rest call and the consistency header will be added. If it
|
||||||
|
# doesn't match, the backend will return a synchronization error
|
||||||
|
# that will be handled by the rest_call.
|
||||||
|
time.sleep(polling_interval)
|
||||||
|
self.servers.rest_call('GET', HEALTH_PATH)
|
||||||
|
@ -16,6 +16,8 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
#
|
#
|
||||||
# @author: Sumit Naiksatam, sumitnaiksatam@gmail.com, Big Switch Networks, Inc.
|
# @author: Sumit Naiksatam, sumitnaiksatam@gmail.com, Big Switch Networks, Inc.
|
||||||
|
# @author: Kevin Benton, Big Switch Networks, Inc.
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
@ -25,7 +27,7 @@ from neutron.openstack.common import log
|
|||||||
from neutron.plugins.bigswitch import config as pl_config
|
from neutron.plugins.bigswitch import config as pl_config
|
||||||
from neutron.plugins.bigswitch.db import porttracker_db
|
from neutron.plugins.bigswitch.db import porttracker_db
|
||||||
from neutron.plugins.bigswitch.plugin import NeutronRestProxyV2Base
|
from neutron.plugins.bigswitch.plugin import NeutronRestProxyV2Base
|
||||||
from neutron.plugins.bigswitch.servermanager import ServerPool
|
from neutron.plugins.bigswitch import servermanager
|
||||||
from neutron.plugins.ml2 import driver_api as api
|
from neutron.plugins.ml2 import driver_api as api
|
||||||
|
|
||||||
|
|
||||||
@ -51,7 +53,11 @@ class BigSwitchMechanismDriver(NeutronRestProxyV2Base,
|
|||||||
self.native_bulk_support = False
|
self.native_bulk_support = False
|
||||||
|
|
||||||
# init network ctrl connections
|
# init network ctrl connections
|
||||||
self.servers = ServerPool(server_timeout)
|
self.servers = servermanager.ServerPool(server_timeout)
|
||||||
|
self.servers.get_topo_function = self._get_all_data
|
||||||
|
self.servers.get_topo_function_args = {'get_ports': True,
|
||||||
|
'get_floating_ips': False,
|
||||||
|
'get_routers': False}
|
||||||
self.segmentation_types = ', '.join(cfg.CONF.ml2.type_drivers)
|
self.segmentation_types = ', '.join(cfg.CONF.ml2.type_drivers)
|
||||||
LOG.debug(_("Initialization done"))
|
LOG.debug(_("Initialization done"))
|
||||||
|
|
||||||
@ -102,6 +108,8 @@ class BigSwitchMechanismDriver(NeutronRestProxyV2Base,
|
|||||||
prepped_port = self._map_state_and_status(prepped_port)
|
prepped_port = self._map_state_and_status(prepped_port)
|
||||||
if (portbindings.HOST_ID not in prepped_port or
|
if (portbindings.HOST_ID not in prepped_port or
|
||||||
prepped_port[portbindings.HOST_ID] == ''):
|
prepped_port[portbindings.HOST_ID] == ''):
|
||||||
|
LOG.warning(_("Ignoring port notification to controller because "
|
||||||
|
"of missing host ID."))
|
||||||
# in ML2, controller doesn't care about ports without
|
# in ML2, controller doesn't care about ports without
|
||||||
# the host_id set
|
# the host_id set
|
||||||
return False
|
return False
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
import json
|
import json
|
||||||
|
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
|
from neutron.plugins.bigswitch import servermanager
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -35,13 +36,16 @@ class HTTPResponseMock():
|
|||||||
def read(self):
|
def read(self):
|
||||||
return "{'status': '200 OK'}"
|
return "{'status': '200 OK'}"
|
||||||
|
|
||||||
|
def getheader(self, header):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
class HTTPResponseMock404(HTTPResponseMock):
|
class HTTPResponseMock404(HTTPResponseMock):
|
||||||
status = 404
|
status = 404
|
||||||
reason = 'Not Found'
|
reason = 'Not Found'
|
||||||
|
|
||||||
def read(self):
|
def read(self):
|
||||||
return "{'status': '404 Not Found'}"
|
return "{'status': '%s 404 Not Found'}" % servermanager.NXNETWORK
|
||||||
|
|
||||||
|
|
||||||
class HTTPResponseMock500(HTTPResponseMock):
|
class HTTPResponseMock500(HTTPResponseMock):
|
||||||
@ -99,6 +103,13 @@ class HTTPConnectionMock(object):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class HTTPConnectionMock404(HTTPConnectionMock):
|
||||||
|
|
||||||
|
def __init__(self, server, port, timeout):
|
||||||
|
self.response = HTTPResponseMock404(None)
|
||||||
|
self.broken = True
|
||||||
|
|
||||||
|
|
||||||
class HTTPConnectionMock500(HTTPConnectionMock):
|
class HTTPConnectionMock500(HTTPConnectionMock):
|
||||||
|
|
||||||
def __init__(self, server, port, timeout):
|
def __init__(self, server, port, timeout):
|
||||||
|
@ -29,8 +29,10 @@ RESTPROXY_PKG_PATH = 'neutron.plugins.bigswitch.plugin'
|
|||||||
NOTIFIER = 'neutron.plugins.bigswitch.plugin.AgentNotifierApi'
|
NOTIFIER = 'neutron.plugins.bigswitch.plugin.AgentNotifierApi'
|
||||||
CALLBACKS = 'neutron.plugins.bigswitch.plugin.RestProxyCallbacks'
|
CALLBACKS = 'neutron.plugins.bigswitch.plugin.RestProxyCallbacks'
|
||||||
CERTFETCH = 'neutron.plugins.bigswitch.servermanager.ServerPool._fetch_cert'
|
CERTFETCH = 'neutron.plugins.bigswitch.servermanager.ServerPool._fetch_cert'
|
||||||
|
SERVER_MANAGER = 'neutron.plugins.bigswitch.servermanager'
|
||||||
HTTPCON = 'httplib.HTTPConnection'
|
HTTPCON = 'httplib.HTTPConnection'
|
||||||
SPAWN = 'eventlet.GreenPool.spawn_n'
|
SPAWN = 'eventlet.GreenPool.spawn_n'
|
||||||
|
CWATCH = SERVER_MANAGER + '.ServerPool._consistency_watchdog'
|
||||||
|
|
||||||
|
|
||||||
class BigSwitchTestBase(object):
|
class BigSwitchTestBase(object):
|
||||||
@ -50,9 +52,11 @@ class BigSwitchTestBase(object):
|
|||||||
self.plugin_notifier_p = mock.patch(NOTIFIER)
|
self.plugin_notifier_p = mock.patch(NOTIFIER)
|
||||||
self.callbacks_p = mock.patch(CALLBACKS)
|
self.callbacks_p = mock.patch(CALLBACKS)
|
||||||
self.spawn_p = mock.patch(SPAWN)
|
self.spawn_p = mock.patch(SPAWN)
|
||||||
|
self.watch_p = mock.patch(CWATCH)
|
||||||
self.addCleanup(mock.patch.stopall)
|
self.addCleanup(mock.patch.stopall)
|
||||||
self.addCleanup(db.clear_db)
|
self.addCleanup(db.clear_db)
|
||||||
self.callbacks_p.start()
|
self.callbacks_p.start()
|
||||||
self.plugin_notifier_p.start()
|
self.plugin_notifier_p.start()
|
||||||
self.httpPatch.start()
|
self.httpPatch.start()
|
||||||
self.spawn_p.start()
|
self.spawn_p.start()
|
||||||
|
self.watch_p.start()
|
||||||
|
@ -15,7 +15,8 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from mock import patch
|
from contextlib import nested
|
||||||
|
import mock
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
import webob.exc
|
import webob.exc
|
||||||
|
|
||||||
@ -29,6 +30,8 @@ from neutron.tests.unit import test_api_v2
|
|||||||
import neutron.tests.unit.test_db_plugin as test_plugin
|
import neutron.tests.unit.test_db_plugin as test_plugin
|
||||||
import neutron.tests.unit.test_extension_allowedaddresspairs as test_addr_pair
|
import neutron.tests.unit.test_extension_allowedaddresspairs as test_addr_pair
|
||||||
|
|
||||||
|
patch = mock.patch
|
||||||
|
|
||||||
|
|
||||||
class BigSwitchProxyPluginV2TestCase(test_base.BigSwitchTestBase,
|
class BigSwitchProxyPluginV2TestCase(test_base.BigSwitchTestBase,
|
||||||
test_plugin.NeutronDbPluginV2TestCase):
|
test_plugin.NeutronDbPluginV2TestCase):
|
||||||
@ -150,6 +153,27 @@ class TestBigSwitchProxyPortsV2(test_plugin.TestPortsV2,
|
|||||||
res = req.get_response(self.api)
|
res = req.get_response(self.api)
|
||||||
self.assertEqual(res.status_int, 200)
|
self.assertEqual(res.status_int, 200)
|
||||||
|
|
||||||
|
def test_create404_triggers_sync(self):
|
||||||
|
# allow async port thread for this patch
|
||||||
|
self.spawn_p.stop()
|
||||||
|
with nested(
|
||||||
|
self.subnet(),
|
||||||
|
patch('httplib.HTTPConnection', create=True,
|
||||||
|
new=fake_server.HTTPConnectionMock404),
|
||||||
|
patch(test_base.RESTPROXY_PKG_PATH
|
||||||
|
+ '.NeutronRestProxyV2._send_all_data')
|
||||||
|
) as (s, mock_http, mock_send_all):
|
||||||
|
with self.port(subnet=s, device_id='somedevid') as p:
|
||||||
|
# wait for the async port thread to finish
|
||||||
|
plugin = NeutronManager.get_plugin()
|
||||||
|
plugin.evpool.waitall()
|
||||||
|
call = mock.call(
|
||||||
|
send_routers=True, send_ports=True, send_floating_ips=True,
|
||||||
|
triggered_by_tenant=p['port']['tenant_id']
|
||||||
|
)
|
||||||
|
mock_send_all.assert_has_calls([call])
|
||||||
|
self.spawn_p.start()
|
||||||
|
|
||||||
|
|
||||||
class TestBigSwitchProxyPortsV2IVS(test_plugin.TestPortsV2,
|
class TestBigSwitchProxyPortsV2IVS(test_plugin.TestPortsV2,
|
||||||
BigSwitchProxyPluginV2TestCase,
|
BigSwitchProxyPluginV2TestCase,
|
||||||
|
@ -15,9 +15,13 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
from contextlib import nested
|
||||||
|
import mock
|
||||||
import webob.exc
|
import webob.exc
|
||||||
|
|
||||||
from neutron.extensions import portbindings
|
from neutron.extensions import portbindings
|
||||||
|
from neutron.manager import NeutronManager
|
||||||
|
from neutron.plugins.bigswitch import servermanager
|
||||||
from neutron.plugins.ml2 import config as ml2_config
|
from neutron.plugins.ml2 import config as ml2_config
|
||||||
from neutron.plugins.ml2.drivers import type_vlan as vlan_config
|
from neutron.plugins.ml2.drivers import type_vlan as vlan_config
|
||||||
import neutron.tests.unit.bigswitch.test_restproxy_plugin as trp
|
import neutron.tests.unit.bigswitch.test_restproxy_plugin as trp
|
||||||
@ -27,6 +31,9 @@ from neutron.tests.unit import test_db_plugin
|
|||||||
PHYS_NET = 'physnet1'
|
PHYS_NET = 'physnet1'
|
||||||
VLAN_START = 1000
|
VLAN_START = 1000
|
||||||
VLAN_END = 1100
|
VLAN_END = 1100
|
||||||
|
SERVER_POOL = 'neutron.plugins.bigswitch.servermanager.ServerPool'
|
||||||
|
DRIVER_MOD = 'neutron.plugins.ml2.drivers.mech_bigswitch.driver'
|
||||||
|
DRIVER = DRIVER_MOD + '.BigSwitchMechanismDriver'
|
||||||
|
|
||||||
|
|
||||||
class TestBigSwitchMechDriverBase(trp.BigSwitchProxyPluginV2TestCase):
|
class TestBigSwitchMechDriverBase(trp.BigSwitchProxyPluginV2TestCase):
|
||||||
@ -93,3 +100,26 @@ class TestBigSwitchMechDriverPortsV2(test_db_plugin.TestPortsV2,
|
|||||||
if res.status_int >= 400:
|
if res.status_int >= 400:
|
||||||
raise webob.exc.HTTPClientError(code=res.status_int)
|
raise webob.exc.HTTPClientError(code=res.status_int)
|
||||||
return self.deserialize(fmt, res)
|
return self.deserialize(fmt, res)
|
||||||
|
|
||||||
|
def test_create404_triggers_background_sync(self):
|
||||||
|
# allow the async background thread to run for this test
|
||||||
|
self.spawn_p.stop()
|
||||||
|
with nested(
|
||||||
|
mock.patch(SERVER_POOL + '.rest_create_port',
|
||||||
|
side_effect=servermanager.RemoteRestError(
|
||||||
|
reason=servermanager.NXNETWORK, status=404)),
|
||||||
|
mock.patch(DRIVER + '._send_all_data'),
|
||||||
|
self.port(**{'device_id': 'devid', 'binding:host_id': 'host'})
|
||||||
|
) as (mock_http, mock_send_all, p):
|
||||||
|
# wait for thread to finish
|
||||||
|
mm = NeutronManager.get_plugin().mechanism_manager
|
||||||
|
bigdriver = mm.mech_drivers['bigswitch'].obj
|
||||||
|
bigdriver.evpool.waitall()
|
||||||
|
mock_send_all.assert_has_calls([
|
||||||
|
mock.call(
|
||||||
|
send_routers=False, send_ports=True,
|
||||||
|
send_floating_ips=False,
|
||||||
|
triggered_by_tenant=p['port']['tenant_id']
|
||||||
|
)
|
||||||
|
])
|
||||||
|
self.spawn_p.start()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user