Merge "Fix relaunch attempts when hitting quota errors"
This commit is contained in:
commit
03b7b4baef
@ -300,6 +300,15 @@ class LabelRecorder(object):
|
|||||||
self.data.remove({'label': label, 'node_id': node_id})
|
self.data.remove({'label': label, 'node_id': node_id})
|
||||||
return node_id
|
return node_id
|
||||||
|
|
||||||
|
def removeNode(self, id):
|
||||||
|
'''
|
||||||
|
Remove the node with the specified ID.
|
||||||
|
'''
|
||||||
|
for d in self.data:
|
||||||
|
if d['node_id'] == id:
|
||||||
|
self.data.remove(d)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
class NodeRequestHandlerNotifications(object):
|
class NodeRequestHandlerNotifications(object):
|
||||||
"""
|
"""
|
||||||
@ -645,9 +654,18 @@ class NodeRequestHandler(NodeRequestHandlerNotifications,
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
# Launches are complete, so populate ready_nodes and failed_nodes.
|
# Launches are complete, so populate ready_nodes and failed_nodes.
|
||||||
for node in self.nodeset:
|
aborted_nodes = []
|
||||||
|
for node in self.nodeset[:]:
|
||||||
if node.state == zk.READY:
|
if node.state == zk.READY:
|
||||||
self.ready_nodes.append(node)
|
self.ready_nodes.append(node)
|
||||||
|
elif node.state == zk.ABORTED:
|
||||||
|
# ABORTED is a transient error triggered by overquota. In order
|
||||||
|
# to handle this gracefully don't count this as failed so the
|
||||||
|
# node is relaunched within this provider. Unlock the node so
|
||||||
|
# the DeletedNodeWorker cleans up the zNode.
|
||||||
|
aborted_nodes.append(node)
|
||||||
|
self.nodeset.remove(node)
|
||||||
|
self.zk.unlockNode(node)
|
||||||
else:
|
else:
|
||||||
self.failed_nodes.append(node)
|
self.failed_nodes.append(node)
|
||||||
|
|
||||||
@ -674,6 +692,14 @@ class NodeRequestHandler(NodeRequestHandlerNotifications,
|
|||||||
self.log.debug("Declining node request %s because nodes failed",
|
self.log.debug("Declining node request %s because nodes failed",
|
||||||
self.request.id)
|
self.request.id)
|
||||||
self.decline_request()
|
self.decline_request()
|
||||||
|
elif aborted_nodes:
|
||||||
|
# Because nodes are added to the satisfied types list before they
|
||||||
|
# are ready we need to remove the aborted nodes again so they can
|
||||||
|
# be created again.
|
||||||
|
for node in aborted_nodes:
|
||||||
|
self._satisfied_types.removeNode(node.id)
|
||||||
|
self.paused = True
|
||||||
|
return False
|
||||||
else:
|
else:
|
||||||
# The assigned nodes must be added to the request in the order
|
# The assigned nodes must be added to the request in the order
|
||||||
# in which they were requested.
|
# in which they were requested.
|
||||||
|
@ -246,10 +246,12 @@ class OpenStackNodeLauncher(NodeLauncher):
|
|||||||
self.zk.storeNode(self.node)
|
self.zk.storeNode(self.node)
|
||||||
if attempts == self._retries:
|
if attempts == self._retries:
|
||||||
raise
|
raise
|
||||||
# Invalidate the quota cache if we encountered a quota error.
|
|
||||||
if 'quota exceeded' in str(e).lower():
|
if 'quota exceeded' in str(e).lower():
|
||||||
|
# A quota exception is not directly recoverable so bail
|
||||||
|
# out immediately with a specific exception.
|
||||||
self.log.info("Quota exceeded, invalidating quota cache")
|
self.log.info("Quota exceeded, invalidating quota cache")
|
||||||
self.handler.manager.invalidateQuotaCache()
|
self.handler.manager.invalidateQuotaCache()
|
||||||
|
raise exceptions.QuotaException("Quota exceeded")
|
||||||
attempts += 1
|
attempts += 1
|
||||||
|
|
||||||
self.node.state = zk.READY
|
self.node.state = zk.READY
|
||||||
@ -380,8 +382,8 @@ class OpenStackNodeRequestHandler(NodeRequestHandler):
|
|||||||
'''
|
'''
|
||||||
Check if all launch requests have completed.
|
Check if all launch requests have completed.
|
||||||
|
|
||||||
When all of the Node objects have reached a final state (READY or
|
When all of the Node objects have reached a final state (READY, FAILED
|
||||||
FAILED), we'll know all threads have finished the launch process.
|
or ABORTED), we'll know all threads have finished the launch process.
|
||||||
'''
|
'''
|
||||||
if not self._threads:
|
if not self._threads:
|
||||||
return True
|
return True
|
||||||
@ -392,9 +394,10 @@ class OpenStackNodeRequestHandler(NodeRequestHandler):
|
|||||||
|
|
||||||
node_states = [node.state for node in self.nodeset]
|
node_states = [node.state for node in self.nodeset]
|
||||||
|
|
||||||
# NOTE: It very important that NodeLauncher always sets one of
|
# NOTE: It's very important that NodeLauncher always sets one of
|
||||||
# these states, no matter what.
|
# these states, no matter what.
|
||||||
if not all(s in (zk.READY, zk.FAILED) for s in node_states):
|
if not all(s in (zk.READY, zk.FAILED, zk.ABORTED)
|
||||||
|
for s in node_states):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
@ -22,6 +22,7 @@ import time
|
|||||||
|
|
||||||
from kazoo import exceptions as kze
|
from kazoo import exceptions as kze
|
||||||
|
|
||||||
|
from nodepool import exceptions
|
||||||
from nodepool import stats
|
from nodepool import stats
|
||||||
from nodepool import zk
|
from nodepool import zk
|
||||||
|
|
||||||
@ -69,6 +70,16 @@ class NodeLauncher(threading.Thread,
|
|||||||
self.node.id)
|
self.node.id)
|
||||||
self.node.state = zk.FAILED
|
self.node.state = zk.FAILED
|
||||||
statsd_key = 'error.zksession'
|
statsd_key = 'error.zksession'
|
||||||
|
except exceptions.QuotaException:
|
||||||
|
# We encountered a quota error when trying to launch a
|
||||||
|
# node. In this case we need to abort the launch. The upper
|
||||||
|
# layers will take care of this and reschedule a new node once
|
||||||
|
# the quota is ok again.
|
||||||
|
self.log.info("Aborting node %s due to quota failure" %
|
||||||
|
self.node.id)
|
||||||
|
self.node.state = zk.ABORTED
|
||||||
|
self.zk.storeNode(self.node)
|
||||||
|
statsd_key = 'error.quota'
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.exception("Launch failed for node %s:", self.node.id)
|
self.log.exception("Launch failed for node %s:", self.node.id)
|
||||||
self.node.state = zk.FAILED
|
self.node.state = zk.FAILED
|
||||||
|
@ -45,6 +45,10 @@ class DibFailedError(BuilderError):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class QuotaException(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class TimeoutException(Exception):
|
class TimeoutException(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -224,6 +224,8 @@ class PoolWorker(threading.Thread):
|
|||||||
try:
|
try:
|
||||||
if not r.poll():
|
if not r.poll():
|
||||||
active_handlers.append(r)
|
active_handlers.append(r)
|
||||||
|
if r.paused:
|
||||||
|
self.paused_handler = r
|
||||||
else:
|
else:
|
||||||
self.log.debug("Removing handler for request %s",
|
self.log.debug("Removing handler for request %s",
|
||||||
r.request.id)
|
r.request.id)
|
||||||
@ -662,7 +664,7 @@ class DeletedNodeWorker(BaseCleanupWorker):
|
|||||||
Delete instances from providers and nodes entries from ZooKeeper.
|
Delete instances from providers and nodes entries from ZooKeeper.
|
||||||
'''
|
'''
|
||||||
cleanup_states = (zk.USED, zk.IN_USE, zk.BUILDING, zk.FAILED,
|
cleanup_states = (zk.USED, zk.IN_USE, zk.BUILDING, zk.FAILED,
|
||||||
zk.DELETING)
|
zk.DELETING, zk.ABORTED)
|
||||||
|
|
||||||
zk_conn = self._nodepool.getZK()
|
zk_conn = self._nodepool.getZK()
|
||||||
for node in zk_conn.nodeIterator():
|
for node in zk_conn.nodeIterator():
|
||||||
|
@ -297,32 +297,35 @@ class TestLauncher(tests.DBTestCase):
|
|||||||
# (according to nodepool's quota estimate) fails.
|
# (according to nodepool's quota estimate) fails.
|
||||||
client.max_instances = 1
|
client.max_instances = 1
|
||||||
|
|
||||||
# Request a second node; this request should fail.
|
# Request a second node; this request should pause the handler.
|
||||||
req2 = zk.NodeRequest()
|
req2 = zk.NodeRequest()
|
||||||
req2.state = zk.REQUESTED
|
req2.state = zk.REQUESTED
|
||||||
req2.node_types.append('fake-label')
|
req2.node_types.append('fake-label')
|
||||||
self.log.debug("Adding second request")
|
self.log.debug("Adding second request")
|
||||||
self.zk.storeNodeRequest(req2)
|
self.zk.storeNodeRequest(req2)
|
||||||
req2 = self.waitForNodeRequest(req2)
|
|
||||||
self.assertEqual(req2.state, zk.FAILED)
|
|
||||||
|
|
||||||
# After the second request failed, the internal quota estimate
|
|
||||||
# should be reset, so the next request should pause to wait
|
|
||||||
# for more quota to become available.
|
|
||||||
req3 = zk.NodeRequest()
|
|
||||||
req3.state = zk.REQUESTED
|
|
||||||
req3.node_types.append('fake-label')
|
|
||||||
self.log.debug("Adding third request")
|
|
||||||
self.zk.storeNodeRequest(req3)
|
|
||||||
req3 = self.waitForNodeRequest(req3, (zk.PENDING,))
|
|
||||||
self.assertEqual(req3.state, zk.PENDING)
|
|
||||||
|
|
||||||
# Wait until there is a paused request handler and verify that
|
|
||||||
# there is still only one server built (from the first
|
|
||||||
# request).
|
|
||||||
pool_worker = pool.getPoolWorkers('fake-provider')
|
pool_worker = pool.getPoolWorkers('fake-provider')
|
||||||
while not pool_worker[0].paused_handler:
|
while not pool_worker[0].paused_handler:
|
||||||
|
# self.log.debug("tick")
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
self.log.debug("finished waiting")
|
||||||
|
|
||||||
|
# The handler is paused now and the request should be in state PENDING
|
||||||
|
req2 = self.waitForNodeRequest(req2, zk.PENDING)
|
||||||
|
self.assertEqual(req2.state, zk.PENDING)
|
||||||
|
|
||||||
|
# Now free up the first node
|
||||||
|
self.log.debug("Marking first node as used %s", req1.id)
|
||||||
|
req1_node.state = zk.USED
|
||||||
|
self.zk.storeNode(req1_node)
|
||||||
|
self.zk.unlockNode(req1_node)
|
||||||
|
self.waitForNodeDeletion(req1_node)
|
||||||
|
|
||||||
|
# After the first node is cleaned up the second request should be
|
||||||
|
# able to fulfill now.
|
||||||
|
req2 = self.waitForNodeRequest(req2)
|
||||||
|
self.assertEqual(req2.state, zk.FULFILLED)
|
||||||
|
|
||||||
self.assertEqual(len(client._server_list), 1)
|
self.assertEqual(len(client._server_list), 1)
|
||||||
|
|
||||||
def test_fail_request_on_launch_failure(self):
|
def test_fail_request_on_launch_failure(self):
|
||||||
|
@ -52,6 +52,9 @@ USED = 'used'
|
|||||||
HOLD = 'hold'
|
HOLD = 'hold'
|
||||||
# Initial node state
|
# Initial node state
|
||||||
INIT = 'init'
|
INIT = 'init'
|
||||||
|
# Aborted due to a transient error like overquota that should not count as a
|
||||||
|
# failed launch attempt
|
||||||
|
ABORTED = 'aborted'
|
||||||
|
|
||||||
|
|
||||||
# NOTE(Shrews): Importing this from nodepool.config causes an import error
|
# NOTE(Shrews): Importing this from nodepool.config causes an import error
|
||||||
@ -484,7 +487,7 @@ class Node(BaseModel):
|
|||||||
Class representing a launched node.
|
Class representing a launched node.
|
||||||
'''
|
'''
|
||||||
VALID_STATES = set([BUILDING, TESTING, READY, IN_USE, USED,
|
VALID_STATES = set([BUILDING, TESTING, READY, IN_USE, USED,
|
||||||
HOLD, DELETING, FAILED, INIT])
|
HOLD, DELETING, FAILED, INIT, ABORTED])
|
||||||
|
|
||||||
def __init__(self, id=None):
|
def __init__(self, id=None):
|
||||||
super(Node, self).__init__(id)
|
super(Node, self).__init__(id)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user