diff --git a/neutron/plugins/bigswitch/db/consistency_db.py b/neutron/plugins/bigswitch/db/consistency_db.py index cd89a26906..4d1a1db792 100644 --- a/neutron/plugins/bigswitch/db/consistency_db.py +++ b/neutron/plugins/bigswitch/db/consistency_db.py @@ -14,20 +14,22 @@ # under the License. import sqlalchemy as sa +from neutron.common import exceptions from neutron.db import api as db from neutron.db import model_base from neutron.openstack.common import log as logging LOG = logging.getLogger(__name__) -''' -A simple table to store the latest consistency hash -received from a server in case neutron gets restarted. -''' + +class MultipleReadForUpdateCalls(exceptions.NeutronException): + message = _("Only one read_for_update call may be made at a time.") class ConsistencyHash(model_base.BASEV2): ''' + A simple table to store the latest consistency hash + received from a server. For now we only support one global state so the hash_id will always be '1' ''' @@ -37,20 +39,44 @@ class ConsistencyHash(model_base.BASEV2): hash = sa.Column(sa.String(255), nullable=False) -def get_consistency_hash(hash_id='1'): - session = db.get_session() - with session.begin(subtransactions=True): - query = session.query(ConsistencyHash) - res = query.filter_by(hash_id=hash_id).first() - if not res: - return False - return res.hash +class HashHandler(object): + ''' + A wrapper object to keep track of the session and hold the SQL + lock between the read and the update to prevent other servers + from reading the hash during a transaction. + ''' + def __init__(self, context=None, hash_id='1'): + self.hash_id = hash_id + self.session = db.get_session() if not context else context.session + self.hash_db_obj = None + self.transaction = None + def read_for_update(self): + if self.transaction: + raise MultipleReadForUpdateCalls() + self.transaction = self.session.begin(subtransactions=True) + # Lock for update here to prevent another server from reading the hash + # while this one is in the middle of a transaction. + # This may not lock the SQL table in MySQL Galera deployments + # but that's okay because the worst case is a double-sync + res = (self.session.query(ConsistencyHash). + filter_by(hash_id=self.hash_id). + with_lockmode('update').first()) + if not res: + return '' + self.hash_db_obj = res + return res.hash -def put_consistency_hash(hash, hash_id='1'): - session = db.get_session() - with session.begin(subtransactions=True): - conhash = ConsistencyHash(hash_id=hash_id, hash=hash) - session.merge(conhash) + def put_hash(self, hash): + hash = hash or '' + if not self.transaction: + self.transaction = self.session.begin(subtransactions=True) + if self.hash_db_obj is not None: + self.hash_db_obj.hash = hash + else: + conhash = ConsistencyHash(hash_id=self.hash_id, hash=hash) + self.session.merge(conhash) + self.transaction.commit() + self.transaction = None LOG.debug(_("Consistency hash for group %(hash_id)s updated " - "to %(hash)s"), {'hash_id': hash_id, 'hash': hash}) + "to %(hash)s"), {'hash_id': self.hash_id, 'hash': hash}) diff --git a/neutron/plugins/bigswitch/plugin.py b/neutron/plugins/bigswitch/plugin.py index ef9cc03cd7..3cc6e00d10 100644 --- a/neutron/plugins/bigswitch/plugin.py +++ b/neutron/plugins/bigswitch/plugin.py @@ -44,6 +44,7 @@ on port-attach) on an additional PUT to do a bulk dump of all persistent data. """ import copy +import functools import httplib import re @@ -442,6 +443,14 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2, raise exceptions.PortNotFound(port_id=port_id) +def put_context_in_serverpool(f): + @functools.wraps(f) + def wrapper(self, context, *args, **kwargs): + self.servers.set_context(context) + return f(self, context, *args, **kwargs) + return wrapper + + class NeutronRestProxyV2(NeutronRestProxyV2Base, addr_pair_db.AllowedAddressPairsMixin, extradhcpopt_db.ExtraDhcpOptMixin, @@ -508,6 +517,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # Consume from all consumers in threads self.conn.consume_in_threads() + @put_context_in_serverpool def create_network(self, context, network): """Create a network. @@ -551,6 +561,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # return created network return new_net + @put_context_in_serverpool def update_network(self, context, net_id, network): """Updates the properties of a particular Virtual Network. @@ -590,6 +601,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # NOTE(kevinbenton): workaround for eventlet/mysql deadlock @utils.synchronized('bsn-port-barrier') + @put_context_in_serverpool def delete_network(self, context, net_id): """Delete a network. :param context: neutron api request context @@ -612,6 +624,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, self._send_delete_network(orig_net, context) return ret_val + @put_context_in_serverpool def create_port(self, context, port): """Create a port, which is a connection point of a device (e.g., a VM NIC) to attach to a L2 Neutron network. @@ -702,6 +715,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, self._extend_port_dict_binding(context, port) return [self._fields(port, fields) for port in ports] + @put_context_in_serverpool def update_port(self, context, port_id, port): """Update values of a port. @@ -778,6 +792,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # NOTE(kevinbenton): workaround for eventlet/mysql deadlock @utils.synchronized('bsn-port-barrier') + @put_context_in_serverpool def delete_port(self, context, port_id, l3_port_check=True): """Delete a port. :param context: neutron api request context @@ -803,6 +818,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, self._delete_port(context, port_id) self.servers.rest_delete_port(tenid, port['network_id'], port_id) + @put_context_in_serverpool def create_subnet(self, context, subnet): LOG.debug(_("NeutronRestProxyV2: create_subnet() called")) @@ -819,6 +835,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, self._send_update_network(orig_net, context) return new_subnet + @put_context_in_serverpool def update_subnet(self, context, id, subnet): LOG.debug(_("NeutronRestProxyV2: update_subnet() called")) @@ -837,6 +854,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # NOTE(kevinbenton): workaround for eventlet/mysql deadlock @utils.synchronized('bsn-port-barrier') + @put_context_in_serverpool def delete_subnet(self, context, id): LOG.debug(_("NeutronRestProxyV2: delete_subnet() called")) orig_subnet = super(NeutronRestProxyV2, self).get_subnet(context, id) @@ -875,6 +893,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, return tenantset return defaultset + @put_context_in_serverpool def create_router(self, context, router): LOG.debug(_("NeutronRestProxyV2: create_router() called")) @@ -896,6 +915,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # return created router return new_router + @put_context_in_serverpool def update_router(self, context, router_id, router): LOG.debug(_("NeutronRestProxyV2.update_router() called")) @@ -919,6 +939,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # NOTE(kevinbenton): workaround for eventlet/mysql deadlock. # delete_router ends up calling _delete_port instead of delete_port. @utils.synchronized('bsn-port-barrier') + @put_context_in_serverpool def delete_router(self, context, router_id): LOG.debug(_("NeutronRestProxyV2: delete_router() called")) @@ -1009,6 +1030,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, interface_id) return del_ret + @put_context_in_serverpool def create_floatingip(self, context, floatingip): LOG.debug(_("NeutronRestProxyV2: create_floatingip() called")) @@ -1032,6 +1054,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # return created floating IP return new_fl_ip + @put_context_in_serverpool def update_floatingip(self, context, id, floatingip): LOG.debug(_("NeutronRestProxyV2: update_floatingip() called")) @@ -1048,6 +1071,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, self._send_floatingip_update(context) return new_fl_ip + @put_context_in_serverpool def delete_floatingip(self, context, id): LOG.debug(_("NeutronRestProxyV2: delete_floatingip() called")) @@ -1072,6 +1096,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # overriding method from l3_db as original method calls # self.delete_floatingip() which in turn calls self.delete_port() which # is locked with 'bsn-port-barrier' + @put_context_in_serverpool def delete_disassociated_floatingips(self, context, network_id): query = self._model_query(context, l3_db.FloatingIP) query = query.filter_by(floating_network_id=network_id, diff --git a/neutron/plugins/bigswitch/servermanager.py b/neutron/plugins/bigswitch/servermanager.py index 35cf18a21f..b2e56271bb 100644 --- a/neutron/plugins/bigswitch/servermanager.py +++ b/neutron/plugins/bigswitch/servermanager.py @@ -37,6 +37,7 @@ import socket import ssl import eventlet +import eventlet.corolocal from oslo.config import cfg from neutron.common import exceptions @@ -120,7 +121,7 @@ class ServerProxy(object): return self.capabilities def rest_call(self, action, resource, data='', headers={}, timeout=False, - reconnect=False): + reconnect=False, hash_handler=None): uri = self.base_uri + resource body = json.dumps(data) if not headers: @@ -130,7 +131,12 @@ class ServerProxy(object): headers['NeutronProxy-Agent'] = self.name headers['Instance-ID'] = self.neutron_id headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID - headers[HASH_MATCH_HEADER] = self.mypool.consistency_hash or '' + if hash_handler: + # this will be excluded on calls that don't need hashes + # (e.g. topology sync, capability checks) + headers[HASH_MATCH_HEADER] = hash_handler.read_for_update() + else: + hash_handler = cdb.HashHandler() if 'keep-alive' in self.capabilities: headers['Connection'] = 'keep-alive' else: @@ -177,9 +183,7 @@ class ServerProxy(object): try: self.currentconn.request(action, uri, body, headers) response = self.currentconn.getresponse() - newhash = response.getheader(HASH_MATCH_HEADER) - if newhash: - self._put_consistency_hash(newhash) + hash_handler.put_hash(response.getheader(HASH_MATCH_HEADER)) respstr = response.read() respdata = respstr if response.status in self.success_codes: @@ -215,10 +219,6 @@ class ServerProxy(object): 'data': ret[3]}) return ret - def _put_consistency_hash(self, newhash): - self.mypool.consistency_hash = newhash - cdb.put_consistency_hash(newhash) - class ServerPool(object): @@ -234,6 +234,7 @@ class ServerPool(object): self.neutron_id = cfg.CONF.RESTPROXY.neutron_id self.base_uri = base_uri self.name = name + self.contexts = {} self.timeout = cfg.CONF.RESTPROXY.server_timeout self.always_reconnect = not cfg.CONF.RESTPROXY.cache_connections default_port = 8000 @@ -245,10 +246,6 @@ class ServerPool(object): self.get_topo_function = None self.get_topo_function_args = {} - # Hash to send to backend with request as expected previous - # state to verify consistency. - self.consistency_hash = cdb.get_consistency_hash() - if not servers: raise cfg.Error(_('Servers not defined. Aborting server manager.')) servers = [s if len(s.rsplit(':', 1)) == 2 @@ -267,6 +264,19 @@ class ServerPool(object): cfg.CONF.RESTPROXY.consistency_interval) LOG.debug(_("ServerPool: initialization done")) + def set_context(self, context): + # this context needs to be local to the greenthread + # so concurrent requests don't use the wrong context + self.contexts[eventlet.corolocal.get_ident()] = context + + def pop_context(self): + # Don't store these contexts after use. They should only + # last for one request. + try: + return self.contexts.pop(eventlet.corolocal.get_ident()) + except KeyError: + return None + def get_capabilities(self): # lookup on first try try: @@ -393,12 +403,14 @@ class ServerPool(object): @utils.synchronized('bsn-rest-call') def rest_call(self, action, resource, data, headers, ignore_codes, timeout=False): + hash_handler = cdb.HashHandler(context=self.pop_context()) good_first = sorted(self.servers, key=lambda x: x.failed) first_response = None for active_server in good_first: ret = active_server.rest_call(action, resource, data, headers, timeout, - reconnect=self.always_reconnect) + reconnect=self.always_reconnect, + hash_handler=hash_handler) # If inconsistent, do a full synchronization if ret[0] == httplib.CONFLICT: if not self.get_topo_function: diff --git a/neutron/tests/unit/bigswitch/test_servermanager.py b/neutron/tests/unit/bigswitch/test_servermanager.py index 7523b9a4de..85583ed538 100644 --- a/neutron/tests/unit/bigswitch/test_servermanager.py +++ b/neutron/tests/unit/bigswitch/test_servermanager.py @@ -361,7 +361,8 @@ class ServerManagerTests(test_rp.BigSwitchProxyPluginV2TestCase): # making a call should trigger a conflict sync pl.servers.rest_call('GET', '/', '', None, []) srestmock.assert_has_calls([ - mock.call('GET', '/', '', None, False, reconnect=True), + mock.call('GET', '/', '', None, False, reconnect=True, + hash_handler=mock.ANY), mock.call('PUT', '/topology', {'routers': [], 'networks': []}, timeout=None)