From 1fa3ee77791f7c5dd4faa38a2a8012662f6e88a8 Mon Sep 17 00:00:00 2001 From: Gregory Haynes Date: Tue, 26 Jan 2016 05:23:06 +0000 Subject: [PATCH] Cleanup bulder jobs and add disconnect support If nodepool disconnects from the gear server then all outstanding jobs are removed. Before this change we would not detect the disconnect and would wait forever for the outstanding jobs to complete. Moved the job tracking in to job subclasses to make the state machine more clear. Change-Id: Ifaf6522ba54156a2915bab9416eefa7bd14fe70b Depends-On: If5352a373d5fa61dd1ee661f4a37976b3447dd9d --- nodepool/jobs.py | 168 ++++++++++++++++++++++++++++++++ nodepool/nodepool.py | 129 ++++-------------------- nodepool/tests/__init__.py | 10 +- nodepool/tests/test_nodepool.py | 27 ++++- 4 files changed, 219 insertions(+), 115 deletions(-) create mode 100644 nodepool/jobs.py diff --git a/nodepool/jobs.py b/nodepool/jobs.py new file mode 100644 index 000000000..6900f1cd8 --- /dev/null +++ b/nodepool/jobs.py @@ -0,0 +1,168 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import uuid +import threading + +import gear + +import nodedb + + +class WatchableJob(gear.Job): + def __init__(self, *args, **kwargs): + super(WatchableJob, self).__init__(*args, **kwargs) + self._completion_handlers = [] + self._event = threading.Event() + + def _handleCompletion(self, mode=None): + self._event.set() + for handler in self._completion_handlers: + handler(self) + + def addCompletionHandler(self, handler): + self._completion_handlers.append(handler) + + def onCompleted(self): + self._handleCompletion() + + def onFailed(self): + self._handleCompletion() + + def onDisconnect(self): + self._handleCompletion() + + def onWorkStatus(self): + pass + + def waitForCompletion(self, timeout=None): + return self._event.wait(timeout) + + +class NodepoolJob(WatchableJob): + def __init__(self, job_name, job_data_obj, nodepool): + job_uuid = str(uuid.uuid4().hex) + job_data = json.dumps(job_data_obj) + super(NodepoolJob, self).__init__(job_name, job_data, job_uuid) + self.nodepool = nodepool + + def getDbSession(self): + return self.nodepool.getDB().getSession() + + +class ImageBuildJob(NodepoolJob): + log = logging.getLogger("jobs.ImageBuildJob") + + def __init__(self, image_name, image_id, nodepool): + self.image_id = image_id + job_data = {'image-id': str(image_id)} + job_name = 'image-build:%s' % image_name + super(ImageBuildJob, self).__init__(job_name, job_data, nodepool) + + def _deleteImage(self, record_only=False): + with self.getDbSession() as session: + self.log.debug('DIB Image %s (id %d) failed to build. Deleting.', + self.name.split(':', 1)[0], self.image_id) + dib_image = session.getDibImage(self.image_id) + # TODO:greghaynes remove this check and only call deleteDibImage + # when it is smart enough that it doesn't submit a delete job + if not record_only: + self.nodepool.deleteDibImage(dib_image) + else: + dib_image.delete() + + def onCompleted(self): + try: + with self.getDbSession() as session: + dib_image = session.getDibImage(self.image_id) + if dib_image is None: + self.log.error( + 'Unable to find matching dib_image for image_id %s', + self.image_id) + return + dib_image.state = nodedb.READY + session.commit() + self.log.debug('DIB Image %s (id %d) is ready', + self.name.split(':', 1)[0], self.image_id) + finally: + super(ImageBuildJob, self).onCompleted() + + def onFailed(self): + try: + self.log.error('DIB Image %s (id %d) failed to build. Deleting.', + self.name.split(':', 1)[0], self.image_id) + self._deleteImage(True) + finally: + super(ImageBuildJob, self).onFailed() + + def onDisconnect(self): + try: + self.log.error('DIB Image %s (id %d) failed due to gear disconnect.', + self.name.split(':', 1)[0], self.image_id) + self._deleteImage() + finally: + super(ImageBuildJob, self).onDisconnect() + + +class ImageUploadJob(NodepoolJob): + log = logging.getLogger("jobs.ImageUploadJob") + + def __init__(self, image_id, provider_name, external_name, snap_image_id, + nodepool): + self.image_id = image_id + self.snap_image_id = snap_image_id + job_data = { + 'image-name': external_name, + 'provider': provider_name + } + job_name = 'image-upload:%s' % image_id + super(ImageUploadJob, self).__init__(job_name, job_data, nodepool) + + def onCompleted(self): + try: + job_data = json.loads(self.data[0]) + external_id = job_data['external-id'] + + with self.getDbSession() as session: + snap_image = session.getSnapshotImage(self.snap_image_id) + if snap_image is None: + self.log.error( + 'Unable to find matching snap_image for job_id %s', + self.unique) + return + + snap_image.external_id = external_id + snap_image.state = nodedb.READY + self.log.debug('Image %s is ready with external_id %s', + self.snap_image_id, external_id) + finally: + super(ImageUploadJob, self).onCompleted() + + def onDisconnect(self): + try: + self.log.error('Image %s failed to upload due to gear disconnect.', + self.snap_image_id) + self.nodepool.deleteImage(self.snap_image_id) + finally: + super(ImageUploadJob, self).onDisconnect() + + +class ImageDeleteJob(NodepoolJob): + log = logging.getLogger("jobs.ImageDeleteJob") + + def __init__(self, image_id, nodepool): + self.image_id = image_id + job_name = 'image-delete:%s' % image_id + super(ImageDeleteJob, self).__init__(job_name, '', nodepool) diff --git a/nodepool/nodepool.py b/nodepool/nodepool.py index 8bf1d9290..cf32d8cf3 100644 --- a/nodepool/nodepool.py +++ b/nodepool/nodepool.py @@ -39,6 +39,7 @@ import provider_manager import stats import config as nodepool_config +import jobs MINS = 60 HOURS = 60 * MINS @@ -228,35 +229,6 @@ class NodeUpdateListener(threading.Thread): t.start() -class WatchableJob(gear.Job): - def __init__(self, *args, **kwargs): - super(WatchableJob, self).__init__(*args, **kwargs) - self._completion_handlers = [] - self._failure_handlers = [] - self._event = threading.Event() - - def addCompletionHandler(self, handler, *args, **kwargs): - self._completion_handlers.append((handler, args, kwargs)) - - def addFailureHandler(self, handler, *args, **kwargs): - self._failure_handlers.append((handler, args, kwargs)) - - def onCompleted(self): - for handler, args, kwargs in self._completion_handlers: - handler(self, *args, **kwargs) - - self._event.set() - - def onFailed(self): - for handler, args, kwargs in self._failure_handlers: - handler(self, *args, **kwargs) - - self._event.set() - - def waitForCompletion(self, timeout=None): - return self._event.wait(timeout) - - class JobTracker(object): def __init__(self): self._running_jobs = set() @@ -352,6 +324,14 @@ class GearmanClient(gear.Client): job = super(GearmanClient, self).handleWorkException(packet) job.onFailed() + def handleDisconnect(self, job): + super(GearmanClient, self).handleDisconnect(job) + job.onDisconnect() + + def handleWorkStatus(self, packet): + job = super(GearmanClient, self).handleWorkStatus(packet) + job.onWorkStatus() + class InstanceDeleter(threading.Thread): log = logging.getLogger("nodepool.InstanceDeleter") @@ -1100,7 +1080,6 @@ class NodePool(threading.Thread): self._instance_delete_threads = {} self._instance_delete_threads_lock = threading.Lock() self._image_build_jobs = JobTracker() - self._image_upload_jobs = JobTracker() def stop(self): self._stopped = True @@ -1115,8 +1094,8 @@ class NodePool(threading.Thread): def waitForBuiltImages(self): self.log.debug("Waiting for images to complete building.") - while len(self._image_build_jobs.running_jobs) > 0: - time.sleep(.5) + for job in self._image_build_jobs.running_jobs: + job.waitForCompletion() self.log.debug("Done waiting for images to complete building.") def loadConfig(self): @@ -1633,8 +1612,6 @@ class NodePool(threading.Thread): return t def buildImage(self, image): - gearman_job = None - # check if we already have this item in the queue with self.getDB().getSession() as session: queued_images = session.getBuildingDibImagesByName(image.name) @@ -1650,8 +1627,6 @@ class NodePool(threading.Thread): filename = os.path.join(self.config.imagesdir, '%s-%s' % (image.name, str(timestamp))) - job_uuid = str(uuid4().hex) - dib_image = session.createDibImage(image_name=image.name, filename=filename, version=timestamp) @@ -1659,17 +1634,9 @@ class NodePool(threading.Thread): dib_image.image_name, dib_image.state) # Submit image-build job - job_data = json.dumps({ - 'image-id': str(dib_image.id) - }) - gearman_job = WatchableJob( - 'image-build:%s' % image.name, job_data, job_uuid) + gearman_job = jobs.ImageBuildJob(image.name, dib_image.id, + self) self._image_build_jobs.addJob(gearman_job) - gearman_job.addCompletionHandler( - self.handleImageBuildComplete, image_id=dib_image.id) - gearman_job.addFailureHandler( - self.handleImageBuildFailed, image_id=dib_image.id) - self.gearman_client.submitJob(gearman_job, timeout=300) self.log.debug("Queued image building task for %s" % image.name) @@ -1698,18 +1665,10 @@ class NodePool(threading.Thread): session.commit() # Submit image-upload job - gearman_job = WatchableJob( - 'image-upload:%s' % image_id, - json.dumps({ - 'provider': provider, - 'image-name': image_name - }), - job_uuid) - self._image_upload_jobs.addJob(gearman_job) - gearman_job.addCompletionHandler(self.handleImageUploadComplete, - snap_image_id=snap_image.id) + gearman_job = jobs.ImageUploadJob(image_id, provider, image_name, + snap_image.id, self) self.log.debug('Submitting image-upload job uuid: %s' % - (job_uuid,)) + (gearman_job.unique,)) self.gearman_client.submitJob(gearman_job, timeout=300) return gearman_job @@ -1717,53 +1676,6 @@ class NodePool(threading.Thread): self.log.exception( "Could not upload image %s on %s", image_name, provider) - def handleImageUploadComplete(self, job, snap_image_id): - job_uuid = job.unique - job_data = json.loads(job.data[0]) - external_id = job_data['external-id'] - - with self.getDB().getSession() as session: - snap_image = session.getSnapshotImage(snap_image_id) - if snap_image is None: - self.log.error( - 'Unable to find matching snap_image for job_id %s', - job_uuid) - return - - snap_image.external_id = external_id - snap_image.state = nodedb.READY - session.commit() - self.log.debug('Image %s is ready with external_id %s', - snap_image_id, external_id) - - def handleImageBuildComplete(self, job, image_id): - with self.getDB().getSession() as session: - dib_image = session.getDibImage(image_id) - if dib_image is None: - self.log.error( - 'Unable to find matching dib_image for image_id %s', - image_id) - return - dib_image.state = nodedb.READY - session.commit() - self.log.debug('DIB Image %s (id %d) is ready', - job.name.split(':', 1)[0], image_id) - - def handleImageBuildFailed(self, job, image_id): - with self.getDB().getSession() as session: - self.log.debug('DIB Image %s (id %d) failed to build. Deleting.', - job.name.split(':', 1)[0], image_id) - dib_image = session.getDibImage(image_id) - dib_image.delete() - - def handleImageDeleteComplete(self, job, image_id): - with self.getDB().getSession() as session: - dib_image = session.getDibImage(image_id) - - # Remove image from the nodedb - dib_image.state = nodedb.DELETE - dib_image.delete() - def launchNode(self, session, provider, label, target): try: self._launchNode(session, provider, label, target) @@ -1948,14 +1860,9 @@ class NodePool(threading.Thread): def deleteDibImage(self, dib_image): try: # Submit image-delete job - job_uuid = str(uuid4().hex) - gearman_job = WatchableJob( - 'image-delete:%s' % dib_image.id, - '', job_uuid - ) - gearman_job.addCompletionHandler(self.handleImageDeleteComplete, - image_id=dib_image.id) + gearman_job = jobs.ImageDeleteJob(dib_image.id, self) self.gearman_client.submitJob(gearman_job, timeout=300) + dib_image.delete() return gearman_job except Exception: self.log.exception('Could not submit delete job for image id %s', diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index b340999e6..d6f40cbff 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -15,6 +15,7 @@ """Common utilities used in testing""" +import errno import logging import os import pymysql @@ -95,7 +96,14 @@ class GearmanServerFixture(fixtures.Fixture): self.addCleanup(self.shutdownGearman) def shutdownGearman(self): - self.gearman_server.shutdown() + #TODO:greghaynes remove try once gear client protects against this + try: + self.gearman_server.shutdown() + except OSError as e: + if e.errno == errno.EBADF: + pass + else: + raise class GearmanClient(gear.Client): diff --git a/nodepool/tests/test_nodepool.py b/nodepool/tests/test_nodepool.py index 3b74e54b6..abcb8e2cc 100644 --- a/nodepool/tests/test_nodepool.py +++ b/nodepool/tests/test_nodepool.py @@ -18,8 +18,8 @@ import threading import time import fixtures -import testtools +from nodepool import jobs from nodepool import tests from nodepool import nodedb import nodepool.fakeprovider @@ -486,9 +486,9 @@ class TestNodepool(tests.DBTestCase): self.assertEqual(len(nodes), 1) -class TestWatchableJob(testtools.TestCase): +class TestGearClient(tests.DBTestCase): def test_wait_for_completion(self): - wj = nodepool.nodepool.WatchableJob('test', 'test', 'test') + wj = jobs.WatchableJob('test', 'test', 'test') def call_on_completed(): time.sleep(.2) @@ -497,3 +497,24 @@ class TestWatchableJob(testtools.TestCase): t = threading.Thread(target=call_on_completed) t.start() wj.waitForCompletion() + + def test_handle_disconnect(self): + class MyJob(jobs.WatchableJob): + def __init__(self, *args, **kwargs): + super(MyJob, self).__init__(*args, **kwargs) + self.disconnect_called = False + + def onDisconnect(self): + super(MyJob, self).onDisconnect() + self.disconnect_called = True + + client = nodepool.nodepool.GearmanClient() + client.addServer('localhost', self.gearman_server.port) + client.waitForServer() + + job = MyJob('test-job', '', '') + client.submitJob(job) + + self.gearman_server.shutdown() + job.waitForCompletion() + self.assertEqual(job.disconnect_called, True)