BSN: Optimistic locking strategy for consistency

Summary:
  Adds an optimistic locking strategy for the Big Switch
  server manager so multiple Neutron servers wanting to
  communicate with the backend do not receive the consistency
  hash for use simultaneously.

  The bsn-rest-call semaphore is removed because serialization
  is now provided by the new locking scheme.

  A new DB engine is added because the consistency hashes
  need a life-cycle with rollbacks and other DB operations
  than cannot impact or be impacted by database operations
  happening on the regular Neutron objects.

  Unit tests are included for each of the new branches
  introduced.

Problem Statement:
  Requests to the Big Switch controllers must contain the
  consistency hash value received from the previous update.
  Otherwise, an inconsistency error will be triggered which
  will force a synchronization. Essentially, a new backend
  call must be prevented from reading from the consistency
  hash table in the DB until the previous call has updated
  the table with the hash from the server response.

  This can be addressed by a semaphore around the rest_call
  function for the single server use case and by a table lock
  on the consistency table for multiple Neutron servers.
  However, both solutions are inadequate because a single
  Neutron server does not scale and a table lock is not
  supported by common SQL HA deployments (e.g. Galera).

  This issue was previously addressed by deploying servers
  in an active-standby configuration. However, that only
  prevented the problem for HTTP API calls. All Neutron
  servers would respond to RPC messages, some of which would
  result in a port update and possible backend call which
  would trigger a conflict if it happened at the same time
  as a backend call from another server. These unnecessary
  syncs are unsustainable as the topology increases beyond
  ~3k VMs.

  Any solution needs to be back-portable to Icehouse so new
  database tables, new requirements, etc. are all out of the
  question.

Solution:
  This patch stores the lock for the consistency hash as a part
  of the DB record. The guaruntees the database offers around
  atomic insertion and constrained atomic updates offer the
  primitives necessary to ensure that only one process/thread
  can lock the record at once.

  The read_for_update method is modified to not return the hash
  in the database until an identifier is inserted into the
  current record or added as a new record. By using an UPDATE
  query with a WHERE clause restricting to the current state,
  only one of many concurrent callers to the DB will successfully
  update the rows. If a caller sees that it didn't update any
  rows, it will start the process over of trying to get the
  lock.

  If a caller observes that the same ID has the lock for
  more than 60 seconds, it will assume the holder has
  died and will attempt to take the lock. This is also done
  in a concurrency-safe UPDATE call since there may be many
  other callers may attempt to do the same thing. If it
  fails and the lock was taken by someone else, the process
  will start over.

  Some pseudo-code resembling the logic:
    read_current_lock
    if no_record:
      insert_lock
      sleep_and_retry if constraint_violation else return
    if current_is_locked and not timer_exceeded:
      sleep_and_retry
    if update_record_with_lock:
      return
    else:
      sleep_and_retry

Closes-Bug: #1374261
Change-Id: Ifa5a7c9749952bc2785a9bf3fed69ad55bf21acc
This commit is contained in:
Kevin Benton 2014-09-25 21:42:39 -07:00 committed by Kevin Benton
parent ee74e03db0
commit 029e9f7c5a
9 changed files with 318 additions and 23 deletions

View File

@ -12,13 +12,42 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
import re
import string
import time
from oslo.config import cfg
from oslo.db import exception as db_exc
from oslo.db.sqlalchemy import session
import sqlalchemy as sa
from neutron.db import api as db
from neutron.db import model_base
from neutron.openstack.common.gettextutils import _LI, _LW
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
# Maximum time in seconds to wait for a single record lock to be released
# NOTE: The total time waiting may exceed this if there are multiple servers
# waiting for the same lock
MAX_LOCK_WAIT_TIME = 15
def setup_db():
'''Helper to register models for unit tests'''
if HashHandler._FACADE is None:
HashHandler._FACADE = session.EngineFacade.from_config(
cfg.CONF, sqlite_fk=True)
ConsistencyHash.metadata.create_all(
HashHandler._FACADE.get_engine())
def clear_db():
'''Helper to unregister models and clear engine in unit tests'''
if not HashHandler._FACADE:
return
ConsistencyHash.metadata.drop_all(HashHandler._FACADE.get_engine())
HashHandler._FACADE = None
class ConsistencyHash(model_base.BASEV2):
@ -38,31 +67,157 @@ class HashHandler(object):
'''
A wrapper object to keep track of the session between the read
and the update operations.
'''
def __init__(self, context=None, hash_id='1'):
self.hash_id = hash_id
self.session = db.get_session() if not context else context.session
self.hash_db_obj = None
def read_for_update(self):
# REVISIT(kevinbenton): locking here with the DB is prone to deadlocks
# in various multi-REST-call scenarios (router intfs, flips, etc).
# Since it doesn't work in Galera deployments anyway, another sync
# mechanism will have to be introduced to prevent inefficient double
# syncs in HA deployments.
This class needs an SQL engine completely independent of the main
neutron connection so rollbacks from consistency hash operations don't
affect the parent sessions.
'''
_FACADE = None
def __init__(self, hash_id='1'):
if HashHandler._FACADE is None:
HashHandler._FACADE = session.EngineFacade.from_config(
cfg.CONF, sqlite_fk=True)
self.hash_id = hash_id
self.session = HashHandler._FACADE.get_session(autocommit=True,
expire_on_commit=False)
self.random_lock_id = ''.join(random.choice(string.ascii_uppercase
+ string.digits)
for _ in range(10))
self.lock_marker = 'LOCKED_BY[%s]' % self.random_lock_id
def _get_current_record(self):
with self.session.begin(subtransactions=True):
res = (self.session.query(ConsistencyHash).
filter_by(hash_id=self.hash_id).first())
if not res:
return ''
self.hash_db_obj = res
return res.hash
if res:
self.session.refresh(res) # make sure latest is loaded from db
return res
def _insert_empty_hash_with_lock(self):
# try to insert a new hash, return False on conflict
try:
with self.session.begin(subtransactions=True):
res = ConsistencyHash(hash_id=self.hash_id,
hash=self.lock_marker)
self.session.add(res)
return True
except db_exc.DBDuplicateEntry:
# another server created a new record at the same time
return False
def _optimistic_update_hash_record(self, old_record, new_hash):
# Optimistic update strategy. Returns True if successful, else False.
query = sa.update(ConsistencyHash.__table__).values(hash=new_hash)
query = query.where(ConsistencyHash.hash_id == old_record.hash_id)
query = query.where(ConsistencyHash.hash == old_record.hash)
with self._FACADE.get_engine().begin() as conn:
result = conn.execute(query)
# We need to check update row count in case another server is
# doing this at the same time. Only one will succeed, the other will
# not update any rows.
return result.rowcount != 0
def _get_lock_owner(self, record):
matches = re.findall("^LOCKED_BY\[(\w+)\]", record)
if not matches:
return None
return matches[0]
def read_for_update(self):
# An optimistic locking strategy with a timeout to avoid using a
# consistency hash while another server is using it. This will
# not return until a lock is acquired either normally or by stealing
# it after an individual ID holds it for greater than
# MAX_LOCK_WAIT_TIME.
lock_wait_start = None
last_lock_owner = None
while True:
res = self._get_current_record()
if not res:
# no current entry. try to insert to grab lock
if not self._insert_empty_hash_with_lock():
# A failed insert after missing current record means
# a concurrent insert occured. Start process over to
# find the new record.
LOG.debug("Concurrent record inserted. Retrying.")
time.sleep(0.25)
continue
# The empty hash was successfully inserted with our lock
return ''
current_lock_owner = self._get_lock_owner(res.hash)
if not current_lock_owner:
# no current lock. attempt to lock
new = self.lock_marker + res.hash
if not self._optimistic_update_hash_record(res, new):
# someone else beat us to it. restart process to wait
# for new lock ID to be removed
LOG.debug(
"Failed to acquire lock. Restarting lock wait. "
"Previous hash: %(prev)s. Attempted update: %(new)s" %
{'prev': res.hash, 'new': new})
time.sleep(0.25)
continue
# successfully got the lock
return res.hash
LOG.debug("This request's lock ID is %(this)s. "
"DB lock held by %(that)s" %
{'this': self.random_lock_id,
'that': current_lock_owner})
if current_lock_owner == self.random_lock_id:
# no change needed, we already have the table lock due to
# previous read_for_update call.
# return hash with lock tag stripped off for use in a header
return res.hash.replace(self.lock_marker, '')
if current_lock_owner != last_lock_owner:
# The owner changed since the last iteration, but it
# wasn't to us. Reset the counter. Log if not
# first iteration.
if lock_wait_start:
LOG.debug("Lock owner changed from %(old)s to %(new)s "
"while waiting to acquire it.",
{'old': last_lock_owner,
'new': current_lock_owner})
lock_wait_start = time.time()
last_lock_owner = current_lock_owner
if time.time() - lock_wait_start > MAX_LOCK_WAIT_TIME:
# the lock has been held too long, steal it
LOG.warning(_LW("Gave up waiting for consistency DB "
"lock, trying to take it. "
"Current hash is: %s"), res.hash)
new_db_value = res.hash.replace(current_lock_owner,
self.random_lock_id)
if self._optimistic_update_hash_record(res, new_db_value):
return res.hash.replace(new_db_value, '')
LOG.info(_LI("Failed to take lock. Another process updated "
"the DB first."))
def clear_lock(self):
LOG.debug("Clearing hash record lock of id %s" % self.random_lock_id)
with self.session.begin(subtransactions=True):
res = (self.session.query(ConsistencyHash).
filter_by(hash_id=self.hash_id).first())
if not res:
LOG.warning(_LW("Hash record already gone, no lock to clear."))
return
if not res.hash.startswith(self.lock_marker):
# if these are frequent the server is too slow
LOG.warning(_LW("Another server already removed the lock. %s"),
res.hash)
return
res.hash = res.hash.replace(self.lock_marker, '')
def put_hash(self, hash):
hash = hash or ''
with self.session.begin(subtransactions=True):
if self.hash_db_obj is not None:
self.hash_db_obj.hash = hash
res = (self.session.query(ConsistencyHash).
filter_by(hash_id=self.hash_id).first())
if res:
res.hash = hash
else:
conhash = ConsistencyHash(hash_id=self.hash_id, hash=hash)
self.session.merge(conhash)

View File

@ -40,7 +40,6 @@ from oslo.config import cfg
from oslo.serialization import jsonutils
from neutron.common import exceptions
from neutron.common import utils
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.plugins.bigswitch.db import consistency_db as cdb
@ -191,11 +190,17 @@ class ServerProxy(object):
# don't clear hash from DB if a hash header wasn't present
if hash_value is not None:
hash_handler.put_hash(hash_value)
else:
hash_handler.clear_lock()
try:
respdata = jsonutils.loads(respstr)
except ValueError:
# response was not JSON, ignore the exception
pass
else:
# release lock so others don't have to wait for timeout
hash_handler.clear_lock()
ret = (response.status, response.reason, respstr, respdata)
except httplib.HTTPException:
# If we were using a cached connection, try again with a new one.
@ -419,7 +424,6 @@ class ServerPool(object):
"""
return resp[0] in SUCCESS_CODES
@utils.synchronized('bsn-rest-call')
def rest_call(self, action, resource, data, headers, ignore_codes,
timeout=False):
context = self.get_context_ref()
@ -430,7 +434,7 @@ class ServerPool(object):
# backend controller
cdict.pop('auth_token', None)
headers[REQ_CONTEXT_HEADER] = jsonutils.dumps(cdict)
hash_handler = cdb.HashHandler(context=context)
hash_handler = cdb.HashHandler()
good_first = sorted(self.servers, key=lambda x: x.failed)
first_response = None
for active_server in good_first:

View File

@ -29,4 +29,5 @@ class BigSwitchDhcpAgentNotifierTestCase(
self.setup_config_files()
self.setup_patches()
super(BigSwitchDhcpAgentNotifierTestCase, self).setUp()
self.setup_db()
self.startHttpPatch()

View File

@ -21,6 +21,7 @@ from oslo.config import cfg
import neutron.common.test_lib as test_lib
from neutron.plugins.bigswitch import config
from neutron.plugins.bigswitch.db import consistency_db
from neutron.tests.unit.bigswitch import fake_server
@ -44,6 +45,7 @@ class BigSwitchTestBase(object):
test_lib.test_config['config_files'] = [os.path.join(etc_path,
'restproxy.ini.test')]
self.addCleanup(cfg.CONF.reset)
self.addCleanup(consistency_db.clear_db)
config.register_config()
# Only try SSL on SSL tests
cfg.CONF.set_override('server_ssl', False, 'RESTPROXY')
@ -68,3 +70,7 @@ class BigSwitchTestBase(object):
self.httpPatch = mock.patch(HTTPCON,
new=fake_server.HTTPConnectionMock)
self.httpPatch.start()
def setup_db(self):
# setup the db engine and models for the consistency db
consistency_db.setup_db()

View File

@ -48,6 +48,7 @@ class BigSwitchProxyPluginV2TestCase(test_base.BigSwitchTestBase,
service_plugins = {'L3_ROUTER_NAT': self._l3_plugin_name}
super(BigSwitchProxyPluginV2TestCase,
self).setUp(self._plugin_name, service_plugins=service_plugins)
self.setup_db()
self.port_create_status = 'BUILD'
self.startHttpPatch()

View File

@ -61,6 +61,7 @@ class DHCPOptsTestCase(test_base.BigSwitchTestBase,
self.setup_config_files()
super(test_extradhcp.ExtraDhcpOptDBTestCase,
self).setUp(plugin=self._plugin_name)
self.setup_db()
self.startHttpPatch()
@ -78,6 +79,7 @@ class RouterDBTestBase(test_base.BigSwitchTestBase,
super(RouterDBTestBase, self).setUp(plugin=self._plugin_name,
ext_mgr=ext_mgr,
service_plugins=service_plugins)
self.setup_db()
cfg.CONF.set_default('allow_overlapping_ips', False)
self.plugin_obj = manager.NeutronManager.get_service_plugins().get(
'L3_ROUTER_NAT')

View File

@ -30,6 +30,7 @@ class RestProxySecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase,
self.setup_patches()
self._attribute_map_bk_ = {}
super(RestProxySecurityGroupsTestCase, self).setUp(self.plugin_str)
self.setup_db()
plugin = manager.NeutronManager.get_plugin()
self.notifier = plugin.notifier
self.rpc = plugin.endpoints[0]

View File

@ -18,12 +18,13 @@ import ssl
import mock
from oslo.config import cfg
from oslo.db import exception as db_exc
from oslo.serialization import jsonutils
from neutron import context
from neutron import manager
from neutron.openstack.common import importutils
from neutron.plugins.bigswitch.db import consistency_db as cdb
from neutron.plugins.bigswitch.db import consistency_db
from neutron.plugins.bigswitch import servermanager
from neutron.tests.unit.bigswitch import test_restproxy_plugin as test_rp
@ -414,7 +415,7 @@ class ServerManagerTests(test_rp.BigSwitchProxyPluginV2TestCase):
def test_delete_failure_sets_bad_hash(self):
pl = manager.NeutronManager.get_plugin()
hash_handler = cdb.HashHandler()
hash_handler = consistency_db.HashHandler()
with mock.patch(
SERVERMANAGER + '.ServerProxy.rest_call',
return_value=(httplib.INTERNAL_SERVER_ERROR, 0, 0, 0)
@ -541,3 +542,126 @@ class TestSockets(test_rp.BigSwitchProxyPluginV2TestCase):
con = self.sm.HTTPSConnectionWithValidation('127.0.0.1', 0, timeout=1)
# if httpcon was created, a connect attempt should raise a socket error
self.assertRaises(socket.error, con.connect)
class HashLockingTests(test_rp.BigSwitchProxyPluginV2TestCase):
def _get_hash_from_handler_db(self, handler):
with handler.session.begin(subtransactions=True):
res = (handler.session.query(consistency_db.ConsistencyHash).
filter_by(hash_id=handler.hash_id).first())
return res.hash
def test_hash_handle_lock_no_initial_record(self):
handler = consistency_db.HashHandler()
h1 = handler.read_for_update()
# return to caller should be empty even with lock in DB
self.assertFalse(h1)
# db should have a lock marker
self.assertEqual(handler.lock_marker,
self._get_hash_from_handler_db(handler))
# an entry should clear the lock
handler.put_hash('DIGEST')
self.assertEqual('DIGEST', self._get_hash_from_handler_db(handler))
def test_hash_handle_lock_existing_record(self):
handler = consistency_db.HashHandler()
handler.put_hash('DIGEST') # set initial hash
h1 = handler.read_for_update()
self.assertEqual('DIGEST', h1)
self.assertEqual(handler.lock_marker + 'DIGEST',
self._get_hash_from_handler_db(handler))
# make sure update works
handler.put_hash('DIGEST2')
self.assertEqual('DIGEST2', self._get_hash_from_handler_db(handler))
def test_db_duplicate_on_insert(self):
handler = consistency_db.HashHandler()
with mock.patch.object(
handler.session, 'add', side_effect=[db_exc.DBDuplicateEntry, '']
) as add_mock:
handler.read_for_update()
# duplicate insert failure should result in retry
self.assertEqual(2, add_mock.call_count)
def test_update_hit_no_records(self):
handler = consistency_db.HashHandler()
# set initial hash so update will be required
handler.put_hash('DIGEST')
with mock.patch.object(handler._FACADE, 'get_engine') as ge:
conn = ge.return_value.begin.return_value.__enter__.return_value
firstresult = mock.Mock()
# a rowcount of 0 simulates the effect of another db client
# updating the same record the handler was trying to update
firstresult.rowcount = 0
secondresult = mock.Mock()
secondresult.rowcount = 1
conn.execute.side_effect = [firstresult, secondresult]
handler.read_for_update()
# update should have been called again after the failure
self.assertEqual(2, conn.execute.call_count)
def test_handler_already_holding_lock(self):
handler = consistency_db.HashHandler()
handler.read_for_update() # lock the table
with mock.patch.object(handler._FACADE, 'get_engine') as ge:
handler.read_for_update()
# get engine should not have been called because no update
# should have been made
self.assertFalse(ge.called)
def test_clear_lock(self):
handler = consistency_db.HashHandler()
handler.put_hash('SOMEHASH')
handler.read_for_update() # lock the table
self.assertEqual(handler.lock_marker + 'SOMEHASH',
self._get_hash_from_handler_db(handler))
handler.clear_lock()
self.assertEqual('SOMEHASH',
self._get_hash_from_handler_db(handler))
def test_clear_lock_skip_after_steal(self):
handler1 = consistency_db.HashHandler()
handler1.read_for_update() # lock the table
handler2 = consistency_db.HashHandler()
with mock.patch.object(consistency_db, 'MAX_LOCK_WAIT_TIME', new=0):
handler2.read_for_update()
before = self._get_hash_from_handler_db(handler1)
# handler1 should not clear handler2's lock
handler1.clear_lock()
self.assertEqual(before, self._get_hash_from_handler_db(handler1))
def test_take_lock_from_other(self):
handler1 = consistency_db.HashHandler()
handler1.read_for_update() # lock the table
handler2 = consistency_db.HashHandler()
with mock.patch.object(consistency_db, 'MAX_LOCK_WAIT_TIME') as mlock:
# make handler2 wait for only one iteration
mlock.__lt__.side_effect = [False, True]
handler2.read_for_update()
# once MAX LOCK exceeded, comparisons should stop due to lock steal
self.assertEqual(2, mlock.__lt__.call_count)
dbentry = self._get_hash_from_handler_db(handler1)
# handler2 should have the lock
self.assertIn(handler2.lock_marker, dbentry)
self.assertNotIn(handler1.lock_marker, dbentry)
# lock protection only blocks read_for_update, anyone can change
handler1.put_hash('H1')
def test_failure_to_steal_lock(self):
handler1 = consistency_db.HashHandler()
handler1.read_for_update() # lock the table
handler2 = consistency_db.HashHandler()
with contextlib.nested(
mock.patch.object(consistency_db, 'MAX_LOCK_WAIT_TIME'),
mock.patch.object(handler2, '_optimistic_update_hash_record',
side_effect=[False, True])
) as (mlock, oplock):
# handler2 will go through 2 iterations since the lock will fail on
# the first attempt
mlock.__lt__.side_effect = [False, True, False, True]
handler2.read_for_update()
self.assertEqual(4, mlock.__lt__.call_count)
self.assertEqual(2, oplock.call_count)

View File

@ -82,6 +82,7 @@ class test_ssl_certificate_base(test_plugin.NeutronDbPluginV2TestCase,
def setUp(self):
super(test_ssl_certificate_base, self).setUp(self.plugin_str)
self.setup_db()
class TestSslSticky(test_ssl_certificate_base):