Add framework for handling node requests
Each ProviderWorker will handle node requests and assign those to threads, represented by the new NodeRequestWorker class. A node request is locked before being passed off to a NodeRequestWorker which will mark it as PENDING, process it, and mark it as FULFILLED before releasing the lock. Change-Id: I529a9c6d94bbec1c14b95d12316b8d576e4c2183
This commit is contained in:
parent
d3a590417e
commit
22c35e0756
@ -671,7 +671,54 @@ class SubNodeLauncher(threading.Thread):
|
||||
return dt
|
||||
|
||||
|
||||
class NodeRequestWorker(threading.Thread):
|
||||
'''
|
||||
Class to process a single node request.
|
||||
|
||||
The ProviderWorker thread will instantiate a class of this type for each
|
||||
node request that it pulls from ZooKeeper. That request will be assigned
|
||||
to this thread for it to process.
|
||||
'''
|
||||
|
||||
def __init__(self, zk, request):
|
||||
threading.Thread.__init__(
|
||||
self, name='NodeRequestWorker.%s' % request.id
|
||||
)
|
||||
self.log = logging.getLogger("nodepool.%s" % self.name)
|
||||
self.zk = zk
|
||||
self.request = request
|
||||
|
||||
def run(self):
|
||||
self.log.debug("Handling request %s" % self.request)
|
||||
try:
|
||||
self._run()
|
||||
except Exception:
|
||||
self.log.exception("Exception in NodeRequestWorker:")
|
||||
self.request.state = zk.FAILED
|
||||
self.zk.updateNodeRequest(self.request)
|
||||
self.zk.unlockNodeRequest(self.request)
|
||||
|
||||
def _run(self):
|
||||
self.request.state = zk.PENDING
|
||||
self.zk.updateNodeRequest(self.request)
|
||||
|
||||
# TODO(Shrews): Make magic happen here
|
||||
|
||||
self.request.state = zk.FULFILLED
|
||||
self.zk.updateNodeRequest(self.request)
|
||||
self.zk.unlockNodeRequest(self.request)
|
||||
|
||||
|
||||
class ProviderWorker(threading.Thread):
|
||||
'''
|
||||
Class that manages node requests for a single provider.
|
||||
|
||||
The NodePool thread will instantiate a class of this type for each
|
||||
provider found in the nodepool configuration file. If the provider to
|
||||
which this thread is assigned is removed from the configuration file, then
|
||||
that will be recognized and this thread will shut itself down.
|
||||
'''
|
||||
|
||||
def __init__(self, configfile, zk, provider):
|
||||
threading.Thread.__init__(
|
||||
self, name='ProviderWorker.%s' % provider.name
|
||||
@ -714,8 +761,32 @@ class ProviderWorker(threading.Thread):
|
||||
self.running = True
|
||||
|
||||
while self.running:
|
||||
self.log.debug("Getting job from ZK queue")
|
||||
# TODO(Shrews): Actually do queue work here
|
||||
self.log.debug("Getting node request from ZK queue")
|
||||
|
||||
for req_id in self.zk.getNodeRequests():
|
||||
req = self.zk.getNodeRequest(req_id)
|
||||
if not req:
|
||||
continue
|
||||
|
||||
# Only interested in unhandled requests
|
||||
if req.state != zk.REQUESTED:
|
||||
continue
|
||||
|
||||
try:
|
||||
self.zk.lockNodeRequest(req, blocking=False)
|
||||
except exceptions.ZKLockException:
|
||||
continue
|
||||
|
||||
# Make sure the state didn't change on us
|
||||
if req.state != zk.REQUESTED:
|
||||
self.zk.unlockNodeRequest(req)
|
||||
continue
|
||||
|
||||
# Got a lock, so assign it
|
||||
self.log.info("Assigning node request %s" % req.id)
|
||||
t = NodeRequestWorker(self.zk, req)
|
||||
t.start()
|
||||
|
||||
time.sleep(10)
|
||||
self._updateProvider()
|
||||
|
||||
|
@ -294,6 +294,7 @@ class NodeRequest(BaseModel):
|
||||
|
||||
def __init__(self, id=None):
|
||||
super(NodeRequest, self).__init__(id)
|
||||
self.lock = None
|
||||
|
||||
def __repr__(self):
|
||||
d = self.toDict()
|
||||
@ -344,6 +345,7 @@ class ZooKeeper(object):
|
||||
IMAGE_ROOT = "/nodepool/images"
|
||||
LAUNCHER_ROOT = "/nodepool/launchers"
|
||||
REQUEST_ROOT = "/nodepool/requests"
|
||||
REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
|
||||
|
||||
def __init__(self):
|
||||
'''
|
||||
@ -391,6 +393,9 @@ class ZooKeeper(object):
|
||||
def _requestPath(self, request):
|
||||
return "%s/%s" % (self.REQUEST_ROOT, request)
|
||||
|
||||
def _requestLockPath(self, request):
|
||||
return "%s/%s" % (self.REQUEST_LOCK_ROOT, request)
|
||||
|
||||
def _dictToStr(self, data):
|
||||
return json.dumps(data)
|
||||
|
||||
@ -1103,3 +1108,70 @@ class ZooKeeper(object):
|
||||
d = NodeRequest.fromDict(self._strToDict(data), request)
|
||||
d.stat = stat
|
||||
return d
|
||||
|
||||
def updateNodeRequest(self, request):
|
||||
'''
|
||||
Update a node request.
|
||||
|
||||
The request must already be locked before updating.
|
||||
|
||||
:param NodeRequest request: The node request to update.
|
||||
'''
|
||||
if request.lock is None:
|
||||
raise Exception("%s must be locked before updating." % request)
|
||||
|
||||
# Validate it still exists before updating
|
||||
if not self.getNodeRequest(request.id):
|
||||
raise Exception(
|
||||
"Attempt to update non-existing request %s" % request)
|
||||
|
||||
path = self._requestPath(request.id)
|
||||
data = request.toDict()
|
||||
self.client.set(path, self._dictToStr(data))
|
||||
|
||||
def lockNodeRequest(self, request, blocking=True, timeout=None):
|
||||
'''
|
||||
Lock a node request.
|
||||
|
||||
This will set the `lock` attribute of the request object when the
|
||||
lock is successfully acquired.
|
||||
|
||||
:param NodeRequest request: The request to lock.
|
||||
:param bool blocking: Whether or not to block on trying to
|
||||
acquire the lock
|
||||
:param int timeout: When blocking, how long to wait for the lock
|
||||
to get acquired. None, the default, waits forever.
|
||||
|
||||
:raises: TimeoutException if we failed to acquire the lock when
|
||||
blocking with a timeout. ZKLockException if we are not blocking
|
||||
and could not get the lock, or a lock is already held.
|
||||
'''
|
||||
path = self._requestLockPath(request.id)
|
||||
try:
|
||||
lock = Lock(self.client, path)
|
||||
have_lock = lock.acquire(blocking, timeout)
|
||||
except kze.LockTimeout:
|
||||
raise npe.TimeoutException(
|
||||
"Timeout trying to acquire lock %s" % path)
|
||||
|
||||
# If we aren't blocking, it's possible we didn't get the lock
|
||||
# because someone else has it.
|
||||
if not have_lock:
|
||||
raise npe.ZKLockException("Did not get lock on %s" % path)
|
||||
|
||||
request.lock = lock
|
||||
|
||||
def unlockNodeRequest(self, request):
|
||||
'''
|
||||
Unlock a node request.
|
||||
|
||||
The request must already have been locked.
|
||||
|
||||
:param NodeRequest request: The request to unlock.
|
||||
|
||||
:raises: ZKLockException if the request is not currently locked.
|
||||
'''
|
||||
if request.lock is None:
|
||||
raise npe.ZKLockException("Request %s does not hold a lock" % request)
|
||||
request.lock.release()
|
||||
request.lock = None
|
||||
|
Loading…
x
Reference in New Issue
Block a user