From 22c35e0756631104a9d44a9e808db92e4f4eba85 Mon Sep 17 00:00:00 2001 From: David Shrewsbury Date: Tue, 10 Jan 2017 16:42:32 -0500 Subject: [PATCH] Add framework for handling node requests Each ProviderWorker will handle node requests and assign those to threads, represented by the new NodeRequestWorker class. A node request is locked before being passed off to a NodeRequestWorker which will mark it as PENDING, process it, and mark it as FULFILLED before releasing the lock. Change-Id: I529a9c6d94bbec1c14b95d12316b8d576e4c2183 --- nodepool/nodepool.py | 75 ++++++++++++++++++++++++++++++++++++++++++-- nodepool/zk.py | 72 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 2 deletions(-) diff --git a/nodepool/nodepool.py b/nodepool/nodepool.py index 491afc5be..2c080c849 100644 --- a/nodepool/nodepool.py +++ b/nodepool/nodepool.py @@ -671,7 +671,54 @@ class SubNodeLauncher(threading.Thread): return dt +class NodeRequestWorker(threading.Thread): + ''' + Class to process a single node request. + + The ProviderWorker thread will instantiate a class of this type for each + node request that it pulls from ZooKeeper. That request will be assigned + to this thread for it to process. + ''' + + def __init__(self, zk, request): + threading.Thread.__init__( + self, name='NodeRequestWorker.%s' % request.id + ) + self.log = logging.getLogger("nodepool.%s" % self.name) + self.zk = zk + self.request = request + + def run(self): + self.log.debug("Handling request %s" % self.request) + try: + self._run() + except Exception: + self.log.exception("Exception in NodeRequestWorker:") + self.request.state = zk.FAILED + self.zk.updateNodeRequest(self.request) + self.zk.unlockNodeRequest(self.request) + + def _run(self): + self.request.state = zk.PENDING + self.zk.updateNodeRequest(self.request) + + # TODO(Shrews): Make magic happen here + + self.request.state = zk.FULFILLED + self.zk.updateNodeRequest(self.request) + self.zk.unlockNodeRequest(self.request) + + class ProviderWorker(threading.Thread): + ''' + Class that manages node requests for a single provider. + + The NodePool thread will instantiate a class of this type for each + provider found in the nodepool configuration file. If the provider to + which this thread is assigned is removed from the configuration file, then + that will be recognized and this thread will shut itself down. + ''' + def __init__(self, configfile, zk, provider): threading.Thread.__init__( self, name='ProviderWorker.%s' % provider.name @@ -714,8 +761,32 @@ class ProviderWorker(threading.Thread): self.running = True while self.running: - self.log.debug("Getting job from ZK queue") - # TODO(Shrews): Actually do queue work here + self.log.debug("Getting node request from ZK queue") + + for req_id in self.zk.getNodeRequests(): + req = self.zk.getNodeRequest(req_id) + if not req: + continue + + # Only interested in unhandled requests + if req.state != zk.REQUESTED: + continue + + try: + self.zk.lockNodeRequest(req, blocking=False) + except exceptions.ZKLockException: + continue + + # Make sure the state didn't change on us + if req.state != zk.REQUESTED: + self.zk.unlockNodeRequest(req) + continue + + # Got a lock, so assign it + self.log.info("Assigning node request %s" % req.id) + t = NodeRequestWorker(self.zk, req) + t.start() + time.sleep(10) self._updateProvider() diff --git a/nodepool/zk.py b/nodepool/zk.py index ca34fdef8..6ded8be27 100644 --- a/nodepool/zk.py +++ b/nodepool/zk.py @@ -294,6 +294,7 @@ class NodeRequest(BaseModel): def __init__(self, id=None): super(NodeRequest, self).__init__(id) + self.lock = None def __repr__(self): d = self.toDict() @@ -344,6 +345,7 @@ class ZooKeeper(object): IMAGE_ROOT = "/nodepool/images" LAUNCHER_ROOT = "/nodepool/launchers" REQUEST_ROOT = "/nodepool/requests" + REQUEST_LOCK_ROOT = "/nodepool/requests-lock" def __init__(self): ''' @@ -391,6 +393,9 @@ class ZooKeeper(object): def _requestPath(self, request): return "%s/%s" % (self.REQUEST_ROOT, request) + def _requestLockPath(self, request): + return "%s/%s" % (self.REQUEST_LOCK_ROOT, request) + def _dictToStr(self, data): return json.dumps(data) @@ -1103,3 +1108,70 @@ class ZooKeeper(object): d = NodeRequest.fromDict(self._strToDict(data), request) d.stat = stat return d + + def updateNodeRequest(self, request): + ''' + Update a node request. + + The request must already be locked before updating. + + :param NodeRequest request: The node request to update. + ''' + if request.lock is None: + raise Exception("%s must be locked before updating." % request) + + # Validate it still exists before updating + if not self.getNodeRequest(request.id): + raise Exception( + "Attempt to update non-existing request %s" % request) + + path = self._requestPath(request.id) + data = request.toDict() + self.client.set(path, self._dictToStr(data)) + + def lockNodeRequest(self, request, blocking=True, timeout=None): + ''' + Lock a node request. + + This will set the `lock` attribute of the request object when the + lock is successfully acquired. + + :param NodeRequest request: The request to lock. + :param bool blocking: Whether or not to block on trying to + acquire the lock + :param int timeout: When blocking, how long to wait for the lock + to get acquired. None, the default, waits forever. + + :raises: TimeoutException if we failed to acquire the lock when + blocking with a timeout. ZKLockException if we are not blocking + and could not get the lock, or a lock is already held. + ''' + path = self._requestLockPath(request.id) + try: + lock = Lock(self.client, path) + have_lock = lock.acquire(blocking, timeout) + except kze.LockTimeout: + raise npe.TimeoutException( + "Timeout trying to acquire lock %s" % path) + + # If we aren't blocking, it's possible we didn't get the lock + # because someone else has it. + if not have_lock: + raise npe.ZKLockException("Did not get lock on %s" % path) + + request.lock = lock + + def unlockNodeRequest(self, request): + ''' + Unlock a node request. + + The request must already have been locked. + + :param NodeRequest request: The request to unlock. + + :raises: ZKLockException if the request is not currently locked. + ''' + if request.lock is None: + raise npe.ZKLockException("Request %s does not hold a lock" % request) + request.lock.release() + request.lock = None