Cause providers to continue to decline requests when at quota

When a provider is at quota, we pause it, and paused means paused.
That means we don't do anything with any other requests.

Unfortunately, that includes requests that the given provider can't
even handle.  So if a provider pauses because it is at quota while
other providers continue to operate, if a request for a node type
that no providers can handle arrives, then that request will remain
outstanding until this provider becomes unpaused and can decline it.

Requests shouldn't need to wait so long to be declined by providers
which can never under any circumstances handle them.  To address this,
we will now run the assignHandlers method whether we are paused or
not.  Within assignHandlers, we will process all requests regardless
of whether we are paused (but we won't necessarily accept them yet).
We will decide whether a request will be declined or not, and if it
will be declined, we will do so regardless of whether we are paused.
Finally, only if we are unpaused and do not expect to decline the
request will we accept it.

Change-Id: Ied9e4577670ea65b1d5ecfef95a7f837a7b6ac61
This commit is contained in:
James E. Blair 2022-09-12 16:25:56 -07:00
parent f670c53a56
commit f31a0dadf8
3 changed files with 136 additions and 67 deletions

View File

@ -379,6 +379,10 @@ class NodeRequestHandler(NodeRequestHandlerNotifications,
self.pool = self.pw.getPoolConfig()
self.zk = self.pw.getZK()
self.manager = self.pw.getProviderManager()
# We need the launcher_id attr
self.log = get_annotated_logger(logging.getLogger(
"nodepool.driver.NodeRequestHandler[%s]" % self.launcher_id),
event_id=self.request.event_id, node_request_id=self.request.id)
@property
def failed_nodes(self):
@ -489,8 +493,7 @@ class NodeRequestHandler(NodeRequestHandlerNotifications,
self.log.debug(
"Declining node request because provider cannot"
" satisfy min-ready")
self.decline_request()
self._declinedHandlerCleanup()
self.declineRequest()
return
self.log.info(
@ -539,22 +542,12 @@ class NodeRequestHandler(NodeRequestHandlerNotifications,
self._satisfied_types.add(ntype, node.id)
self.launch(node)
def _runHandler(self):
def getDeclinedReasons(self):
'''
Main body for the node request handling.
Return a list of reasons to decline this request
'''
self._setFromPoolWorker()
if self.provider is None or self.pool is None:
# If the config changed out from underneath us, we could now be
# an invalid provider and should stop handling this request.
raise Exception("Provider configuration missing")
# We have the launcher_id attr after _setFromPoolWorker() is called.
self.log = get_annotated_logger(logging.getLogger(
"nodepool.driver.NodeRequestHandler[%s]" % self.launcher_id),
event_id=self.request.event_id, node_request_id=self.request.id)
if not hasattr(self, 'provider'):
self._setFromPoolWorker()
declined_reasons = []
invalid_types = self._invalidNodeTypes()
@ -567,12 +560,24 @@ class NodeRequestHandler(NodeRequestHandlerNotifications,
declined_reasons.append('images are not available')
elif not self.hasProviderQuota(self.request.node_types):
declined_reasons.append('it would exceed quota')
return declined_reasons
def _runHandler(self):
'''
Main body for the node request handling.
'''
self._setFromPoolWorker()
if self.provider is None or self.pool is None:
# If the config changed out from underneath us, we could now be
# an invalid provider and should stop handling this request.
raise Exception("Provider configuration missing")
declined_reasons = self.getDeclinedReasons()
if declined_reasons:
self.log.info("Declining node request because %s",
', '.join(declined_reasons))
self.decline_request()
self._declinedHandlerCleanup()
self.declineRequest()
return
if self.paused:
@ -584,6 +589,10 @@ class NodeRequestHandler(NodeRequestHandlerNotifications,
self._waitForNodeSet()
def declineRequest(self):
self._declineRequest()
self._declinedHandlerCleanup()
def _declinedHandlerCleanup(self):
"""
After declining a request, do necessary cleanup actions.
@ -635,7 +644,7 @@ class NodeRequestHandler(NodeRequestHandlerNotifications,
self.nodeset = []
def decline_request(self):
def _declineRequest(self):
# Technically, this check to see if we've already declined it should
# not be necessary. But if there is a bug (and there has been), we
# want to make sure we don't continuously grow this array.
@ -664,8 +673,7 @@ class NodeRequestHandler(NodeRequestHandlerNotifications,
self.log.exception(
"Declining node request due to exception in "
"NodeRequestHandler:")
self.decline_request()
self._declinedHandlerCleanup()
self.declineRequest()
def poll(self):
if self.paused:
@ -711,7 +719,8 @@ class NodeRequestHandler(NodeRequestHandlerNotifications,
if self.failed_nodes:
self.log.debug("Declining node request because nodes failed")
self.decline_request()
self._declineRequest()
# We perform our own cleanup
elif aborted_nodes:
# Because nodes are added to the satisfied types list before they
# are ready we need to remove the aborted nodes again so they can

View File

@ -147,24 +147,6 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
if not self.running:
return True
if self.paused_handler:
self.log.debug("Handler is now paused")
return True
# Get active threads for all pools for this provider
active_threads = sum([
w.activeThreads() for
w in self.nodepool.getPoolWorkers(self.provider_name)
])
# Short-circuit for limited request handling
if (provider.max_concurrency > 0 and
active_threads >= provider.max_concurrency):
self.log.debug("Request handling limited: %s active threads ",
"with max concurrency of %s",
active_threads, provider.max_concurrency)
return True
req = self.zk.getNodeRequest(req.id)
if not req:
continue
@ -227,10 +209,38 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
if check_tenant_quota and not self._hasTenantQuota(req, pm):
# Defer request for it to be handled and fulfilled at a later
# run.
log.debug("Deferring request %s because it would "
"exceed tenant quota", req)
log.debug("Deferring request because it would "
"exceed tenant quota")
continue
# Get a request handler to help decide whether we should
# accept the request, but we're still not sure yet. We
# must lock the request before calling .run().
rh = pm.getRequestHandler(self, req)
reasons_to_decline = rh.getDeclinedReasons()
if self.paused_handler and not reasons_to_decline:
self.log.debug("Handler is paused, deferring request")
continue
# At this point, we are either unpaused, or we know we
# will decline the request.
if not reasons_to_decline:
# Get active threads for all pools for this provider
active_threads = sum([
w.activeThreads() for
w in self.nodepool.getPoolWorkers(self.provider_name)
])
# Short-circuit for limited request handling
if (provider.max_concurrency > 0 and
active_threads >= provider.max_concurrency):
self.log.debug("Request handling limited: %s "
"active threads ",
"with max concurrency of %s",
active_threads, provider.max_concurrency)
continue
log.debug("Locking request")
try:
self.zk.lockNodeRequest(req, blocking=False)
@ -244,11 +254,14 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
log.debug("Request is in state %s", req.state)
continue
# Got a lock, so assign it
log.info("Assigning node request %s" % req)
rh = pm.getRequestHandler(self, req)
rh.run()
if not reasons_to_decline:
# Got a lock, so assign it
log.info("Assigning node request %s" % req)
rh.run()
else:
log.info("Declining node request %s" % req)
rh.declineRequest()
continue
if has_quota_support:
# Adjust the label quota so we don't accept more requests
@ -424,12 +437,6 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
if not self.paused_handler:
self.component_info.paused = False
while not self._assignHandlers():
# _assignHandlers can take quite some time on a busy
# system so sprinkle _removeCompletedHandlers in
# between such that we have a chance to fulfill
# requests that already have all nodes.
self._removeCompletedHandlers()
else:
self.component_info.paused = True
# If we are paused, one request handler could not
@ -438,7 +445,18 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
self.paused_handler.run()
if not self.paused_handler.paused:
self.paused_handler = None
self.component_info.paused = False
# Regardless of whether we are paused, run
# assignHandlers. It will only accept requests if we
# are unpaused, otherwise it will only touch requests
# we intend to decline.
while not self._assignHandlers():
# _assignHandlers can take quite some time on a busy
# system so sprinkle _removeCompletedHandlers in
# between such that we have a chance to fulfill
# requests that already have all nodes.
self._removeCompletedHandlers()
self._removeCompletedHandlers()
except Exception:
self.log.exception("Error in PoolWorker:")

View File

@ -405,6 +405,57 @@ class TestLauncher(tests.DBTestCase):
max_instances=math.inf,
max_ram=2 * 8192)
def test_decline_at_quota(self):
'''test that a provider at quota continues to decline requests'''
# patch the cloud with requested quota
def fake_get_quota():
return (math.inf, 1, math.inf)
self.useFixture(fixtures.MockPatchObject(
fakeprovider.FakeProvider.fake_cloud, '_get_quota',
fake_get_quota
))
configfile = self.setup_config('node_quota_tenant_instances.yaml')
self.useBuilder(configfile)
self.waitForImage('fake-provider', 'fake-image')
nodepool.launcher.LOCK_CLEANUP = 1
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.wait_for_config(pool)
req1 = zk.NodeRequest()
req1.state = zk.REQUESTED
req1.node_types.append('fake-label')
self.zk.storeNodeRequest(req1)
self.log.debug("Waiting for 1st request %s", req1.id)
req1 = self.waitForNodeRequest(req1, (zk.FULFILLED,))
self.assertEqual(len(req1.nodes), 1)
# Mark the first request's nodes as in use so they won't be deleted
# when we pause. Locking them is enough.
req1_node1 = self.zk.getNode(req1.nodes[0])
self.zk.lockNode(req1_node1, blocking=False)
req2 = zk.NodeRequest()
req2.state = zk.REQUESTED
req2.node_types.append('fake-label')
self.zk.storeNodeRequest(req2)
self.log.debug("Waiting for 2nd request %s", req2.id)
req2 = self.waitForNodeRequest(req2, (zk.PENDING,))
req3 = zk.NodeRequest()
req3.state = zk.REQUESTED
req3.node_types.append('invalid-label')
self.zk.storeNodeRequest(req3)
self.log.debug("Waiting for 3rd request %s", req3.id)
req3 = self.waitForNodeRequest(req3, (zk.FAILED,))
# Make sure req2 is still pending.
req2 = self.waitForNodeRequest(req2, (zk.PENDING,))
def test_over_quota(self, config='node_quota_cloud.yaml'):
'''
This tests what happens when a cloud unexpectedly returns an
@ -2405,17 +2456,6 @@ class TestLauncher(tests.DBTestCase):
pool.start()
self.wait_for_config(pool)
# Get the list of request handlers to assert the correct processing
# order. The timestamps of the requests won't work for that as the
# request to be rejected will be handled faster most of the time.
while True:
workers = pool.getPoolWorkers('fake-provider')
if workers:
pool_worker = workers[0]
break
time.sleep(0.1)
request_handlers = pool_worker.request_handlers
# Request with a higher relative priority coming in first, but
# requesting a label that is not available.
req1 = zk.NodeRequest()
@ -2437,10 +2477,12 @@ class TestLauncher(tests.DBTestCase):
req1 = self.waitForNodeRequest(req1)
self.assertEqual(req1.state, zk.FAILED)
# Verify that we created the node for req2 before we declined
# req1. This asserts that the request were processed in the
# correct (reversed) order.
req2_node = self.zk.getNode(req2.nodes[0], cached=False)
self.assertGreater(req1.stat.mtime, req2_node.stat.ctime)
self.assertGreater(req2.id, req1.id)
self.assertEqual(len(request_handlers), 2)
self.assertEqual(request_handlers[0].request.id, req2.id)
self.assertEqual(request_handlers[1].request.id, req1.id)
def test_empty_node_deleted(self):
"""Test that empty nodes are deleted by the cleanup thread"""