Serve all paused handlers before unpausing
The launcher implementation assumed that only one request handler will be paused at any given point in time. However, this is not true when e.g. the request handler accepts multiple requests that all run into a quota limit during launch. The consequence of this is that the pool is unpaused too early and we might accept other node requests until the provider is paused again. This could lead to a starvation of earlier paused handlers as they were fulfilled in a LIFO fashion. To fix this edge case we will store paused request handlers in a set and only unpause the provider when there are no paused handlers anymore. Paused handlers are now also run in priority order. Change-Id: Ia34e2844533ce9942d489838c4ce14a605d79287
This commit is contained in:
parent
46c5e27254
commit
0f1680be7e
@ -72,7 +72,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
self.pool_name = pool_name
|
||||
self.running = False
|
||||
self.stop_event = threading.Event()
|
||||
self.paused_handler = None
|
||||
self.paused_handlers = set()
|
||||
self.request_handlers = []
|
||||
self.watermark_sleep = nodepool.watermark_sleep
|
||||
self.zk = self.getZK()
|
||||
@ -219,7 +219,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
rh = pm.getRequestHandler(self, req)
|
||||
reasons_to_decline = rh.getDeclinedReasons()
|
||||
|
||||
if self.paused_handler and not reasons_to_decline:
|
||||
if self.paused_handlers and not reasons_to_decline:
|
||||
self.log.debug("Handler is paused, deferring request")
|
||||
continue
|
||||
|
||||
@ -275,7 +275,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
label_quota[node_type] -= 1
|
||||
|
||||
if rh.paused:
|
||||
self.paused_handler = rh
|
||||
self.paused_handlers.add(rh)
|
||||
self.request_handlers.append(rh)
|
||||
|
||||
# if we exceeded the timeout stop iterating here
|
||||
@ -295,7 +295,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
if not r.poll():
|
||||
active_handlers.append(r)
|
||||
if r.paused:
|
||||
self.paused_handler = r
|
||||
self.paused_handlers.add(r)
|
||||
else:
|
||||
log.debug("Removing request handler")
|
||||
except kze.SessionExpiredError:
|
||||
@ -435,17 +435,19 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
self.updateTenantLimits(
|
||||
self.nodepool.config.tenant_resource_limits)
|
||||
|
||||
if not self.paused_handler:
|
||||
self.component_info.paused = False
|
||||
else:
|
||||
if self.paused_handlers:
|
||||
self.component_info.paused = True
|
||||
# If we are paused, one request handler could not
|
||||
# If we are paused, some request handlers could not
|
||||
# satisfy its assigned request, so give it
|
||||
# another shot. Unpause ourselves if it completed.
|
||||
self.paused_handler.run()
|
||||
if not self.paused_handler.paused:
|
||||
self.paused_handler = None
|
||||
self.component_info.paused = False
|
||||
# another shot. Unpause ourselves if all are completed.
|
||||
for rh in sorted(self.paused_handlers,
|
||||
key=lambda h: h.request.priority):
|
||||
rh.run()
|
||||
if not rh.paused:
|
||||
self.paused_handlers.remove(rh)
|
||||
|
||||
if not self.paused_handlers:
|
||||
self.component_info.paused = False
|
||||
|
||||
# Regardless of whether we are paused, run
|
||||
# assignHandlers. It will only accept requests if we
|
||||
@ -463,8 +465,9 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
self.stop_event.wait(self.watermark_sleep)
|
||||
|
||||
# Cleanup on exit
|
||||
if self.paused_handler:
|
||||
self.paused_handler.unlockNodeSet(clear_allocation=True)
|
||||
if self.paused_handlers:
|
||||
for rh in self.paused_handlers:
|
||||
rh.unlockNodeSet(clear_allocation=True)
|
||||
|
||||
def stop(self):
|
||||
'''
|
||||
|
@ -261,7 +261,7 @@ class TestDriverAws(tests.DBTestCase):
|
||||
# second request.
|
||||
pool_worker = pool.getPoolWorkers('ec2-us-west-2')
|
||||
for _ in iterate_timeout(30, Exception, 'paused handler'):
|
||||
if pool_worker[0].paused_handler:
|
||||
if pool_worker[0].paused_handlers:
|
||||
break
|
||||
|
||||
# Release the first node so that the second can be fulfilled.
|
||||
@ -316,7 +316,7 @@ class TestDriverAws(tests.DBTestCase):
|
||||
# second request.
|
||||
pool_worker = pool.getPoolWorkers('ec2-us-west-2')
|
||||
for _ in iterate_timeout(30, Exception, 'paused handler'):
|
||||
if pool_worker[0].paused_handler:
|
||||
if pool_worker[0].paused_handlers:
|
||||
break
|
||||
|
||||
# Release the first node so that the second can be fulfilled.
|
||||
|
@ -263,7 +263,7 @@ class TestDriverKubernetes(tests.DBTestCase):
|
||||
if pause:
|
||||
# The previous request should pause the handler
|
||||
pool_worker = pool.getPoolWorkers('kubespray')
|
||||
while not pool_worker[0].paused_handler:
|
||||
while not pool_worker[0].paused_handlers:
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
self.waitForNodeRequest(max_req, (zk.REQUESTED,))
|
||||
|
@ -235,7 +235,7 @@ class TestLauncher(tests.DBTestCase):
|
||||
# Wait until there is a paused request handler and check if there
|
||||
# are exactly two servers
|
||||
pool_worker = pool.getPoolWorkers('fake-provider')
|
||||
while not pool_worker[0].paused_handler:
|
||||
while not pool_worker[0].paused_handlers:
|
||||
time.sleep(0.1)
|
||||
self.assertEqual(len(client._server_list), 2)
|
||||
|
||||
@ -512,7 +512,7 @@ class TestLauncher(tests.DBTestCase):
|
||||
self.zk.storeNodeRequest(req2)
|
||||
|
||||
pool_worker = pool.getPoolWorkers('fake-provider')
|
||||
while not pool_worker[0].paused_handler:
|
||||
while not pool_worker[0].paused_handlers:
|
||||
time.sleep(0.1)
|
||||
|
||||
# The handler is paused now and the request should be in state PENDING
|
||||
@ -2080,7 +2080,7 @@ class TestLauncher(tests.DBTestCase):
|
||||
# causing request handling to pause.
|
||||
self.waitForNodeRequest(req, (zk.PENDING,))
|
||||
pool_worker = pool.getPoolWorkers('fake-provider')
|
||||
while not pool_worker[0].paused_handler:
|
||||
while not pool_worker[0].paused_handlers:
|
||||
time.sleep(0.1)
|
||||
self.assertTrue(mock_invalidatequotacache.called)
|
||||
|
||||
@ -2218,11 +2218,12 @@ class TestLauncher(tests.DBTestCase):
|
||||
|
||||
# Force an exception within the run handler.
|
||||
pool_worker = pool.getPoolWorkers('fake-provider')
|
||||
while not pool_worker[0].paused_handler:
|
||||
while not pool_worker[0].paused_handlers:
|
||||
time.sleep(0.1)
|
||||
pool_worker[0].paused_handler.hasProviderQuota = mock.Mock(
|
||||
side_effect=Exception('mock exception')
|
||||
)
|
||||
for rh in pool_worker[0].paused_handlers:
|
||||
rh.hasProviderQuota = mock.Mock(
|
||||
side_effect=Exception('mock exception')
|
||||
)
|
||||
|
||||
# The above exception should cause us to fail the paused request.
|
||||
req2 = self.waitForNodeRequest(req2, (zk.FAILED,))
|
||||
@ -2230,7 +2231,7 @@ class TestLauncher(tests.DBTestCase):
|
||||
|
||||
# The exception handling should make sure that we unpause AND remove
|
||||
# the request handler.
|
||||
while pool_worker[0].paused_handler:
|
||||
while pool_worker[0].paused_handlers:
|
||||
time.sleep(0.1)
|
||||
self.assertEqual(0, len(pool_worker[0].request_handlers))
|
||||
|
||||
@ -2321,7 +2322,7 @@ class TestLauncher(tests.DBTestCase):
|
||||
self.zk.storeNodeRequest(req2)
|
||||
|
||||
pool_worker = pool.getPoolWorkers('fake-provider')
|
||||
while not pool_worker[0].paused_handler:
|
||||
while not pool_worker[0].paused_handlers:
|
||||
time.sleep(0.1)
|
||||
|
||||
# The handler is paused now and the request should be in state PENDING
|
||||
@ -2413,6 +2414,60 @@ class TestLauncher(tests.DBTestCase):
|
||||
req = self.waitForNodeRequest(req)
|
||||
self.assertEqual(req.state, zk.FULFILLED)
|
||||
|
||||
def test_multiple_paused_requests(self):
|
||||
"""Test that multiple paused requests are fulfilled in order."""
|
||||
max_instances = 0
|
||||
|
||||
def fake_get_quota():
|
||||
nonlocal max_instances
|
||||
return (100, max_instances, 1000000)
|
||||
|
||||
self.useFixture(fixtures.MockPatchObject(
|
||||
fakeprovider.FakeProvider.fake_cloud, '_get_quota',
|
||||
fake_get_quota
|
||||
))
|
||||
|
||||
req1 = zk.NodeRequest()
|
||||
req1.state = zk.REQUESTED
|
||||
req1.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req1)
|
||||
|
||||
req2 = zk.NodeRequest()
|
||||
req2.state = zk.REQUESTED
|
||||
req2.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req2)
|
||||
|
||||
configfile = self.setup_config('ignore_provider_quota_true.yaml')
|
||||
self.useBuilder(configfile)
|
||||
self.waitForImage('fake-provider', 'fake-image')
|
||||
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
|
||||
self.waitForAnyNodeInState(zk.ABORTED)
|
||||
|
||||
pool_worker = pool.getPoolWorkers('fake-provider')
|
||||
while not len(pool_worker[0].paused_handlers) == 2:
|
||||
time.sleep(0.1)
|
||||
|
||||
# Bump up the quota to allow the provider to allocate a node
|
||||
max_instances = 1
|
||||
req1 = self.waitForNodeRequest(req1)
|
||||
self.assertEqual(req1.state, zk.FULFILLED)
|
||||
|
||||
req2 = self.waitForNodeRequest(req2, zk.PENDING)
|
||||
|
||||
# Release the node allocated to the first request
|
||||
req1_node = self.zk.getNode(req1.nodes[0])
|
||||
self.zk.lockNode(req1_node, blocking=False)
|
||||
|
||||
req1_node.state = zk.USED
|
||||
self.zk.storeNode(req1_node)
|
||||
self.zk.unlockNode(req1_node)
|
||||
self.waitForNodeDeletion(req1_node)
|
||||
|
||||
self.waitForNodeRequest(req2, zk.FULFILLED)
|
||||
|
||||
def test_request_order(self):
|
||||
"""Test that requests are handled in sorted order"""
|
||||
configfile = self.setup_config('node_no_min_ready.yaml')
|
||||
|
Loading…
x
Reference in New Issue
Block a user