diff --git a/nodepool/jobs.py b/nodepool/jobs.py new file mode 100644 index 000000000..6900f1cd8 --- /dev/null +++ b/nodepool/jobs.py @@ -0,0 +1,168 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import uuid +import threading + +import gear + +import nodedb + + +class WatchableJob(gear.Job): + def __init__(self, *args, **kwargs): + super(WatchableJob, self).__init__(*args, **kwargs) + self._completion_handlers = [] + self._event = threading.Event() + + def _handleCompletion(self, mode=None): + self._event.set() + for handler in self._completion_handlers: + handler(self) + + def addCompletionHandler(self, handler): + self._completion_handlers.append(handler) + + def onCompleted(self): + self._handleCompletion() + + def onFailed(self): + self._handleCompletion() + + def onDisconnect(self): + self._handleCompletion() + + def onWorkStatus(self): + pass + + def waitForCompletion(self, timeout=None): + return self._event.wait(timeout) + + +class NodepoolJob(WatchableJob): + def __init__(self, job_name, job_data_obj, nodepool): + job_uuid = str(uuid.uuid4().hex) + job_data = json.dumps(job_data_obj) + super(NodepoolJob, self).__init__(job_name, job_data, job_uuid) + self.nodepool = nodepool + + def getDbSession(self): + return self.nodepool.getDB().getSession() + + +class ImageBuildJob(NodepoolJob): + log = logging.getLogger("jobs.ImageBuildJob") + + def __init__(self, image_name, image_id, nodepool): + self.image_id = image_id + job_data = {'image-id': str(image_id)} + job_name = 'image-build:%s' % image_name + super(ImageBuildJob, self).__init__(job_name, job_data, nodepool) + + def _deleteImage(self, record_only=False): + with self.getDbSession() as session: + self.log.debug('DIB Image %s (id %d) failed to build. Deleting.', + self.name.split(':', 1)[0], self.image_id) + dib_image = session.getDibImage(self.image_id) + # TODO:greghaynes remove this check and only call deleteDibImage + # when it is smart enough that it doesn't submit a delete job + if not record_only: + self.nodepool.deleteDibImage(dib_image) + else: + dib_image.delete() + + def onCompleted(self): + try: + with self.getDbSession() as session: + dib_image = session.getDibImage(self.image_id) + if dib_image is None: + self.log.error( + 'Unable to find matching dib_image for image_id %s', + self.image_id) + return + dib_image.state = nodedb.READY + session.commit() + self.log.debug('DIB Image %s (id %d) is ready', + self.name.split(':', 1)[0], self.image_id) + finally: + super(ImageBuildJob, self).onCompleted() + + def onFailed(self): + try: + self.log.error('DIB Image %s (id %d) failed to build. Deleting.', + self.name.split(':', 1)[0], self.image_id) + self._deleteImage(True) + finally: + super(ImageBuildJob, self).onFailed() + + def onDisconnect(self): + try: + self.log.error('DIB Image %s (id %d) failed due to gear disconnect.', + self.name.split(':', 1)[0], self.image_id) + self._deleteImage() + finally: + super(ImageBuildJob, self).onDisconnect() + + +class ImageUploadJob(NodepoolJob): + log = logging.getLogger("jobs.ImageUploadJob") + + def __init__(self, image_id, provider_name, external_name, snap_image_id, + nodepool): + self.image_id = image_id + self.snap_image_id = snap_image_id + job_data = { + 'image-name': external_name, + 'provider': provider_name + } + job_name = 'image-upload:%s' % image_id + super(ImageUploadJob, self).__init__(job_name, job_data, nodepool) + + def onCompleted(self): + try: + job_data = json.loads(self.data[0]) + external_id = job_data['external-id'] + + with self.getDbSession() as session: + snap_image = session.getSnapshotImage(self.snap_image_id) + if snap_image is None: + self.log.error( + 'Unable to find matching snap_image for job_id %s', + self.unique) + return + + snap_image.external_id = external_id + snap_image.state = nodedb.READY + self.log.debug('Image %s is ready with external_id %s', + self.snap_image_id, external_id) + finally: + super(ImageUploadJob, self).onCompleted() + + def onDisconnect(self): + try: + self.log.error('Image %s failed to upload due to gear disconnect.', + self.snap_image_id) + self.nodepool.deleteImage(self.snap_image_id) + finally: + super(ImageUploadJob, self).onDisconnect() + + +class ImageDeleteJob(NodepoolJob): + log = logging.getLogger("jobs.ImageDeleteJob") + + def __init__(self, image_id, nodepool): + self.image_id = image_id + job_name = 'image-delete:%s' % image_id + super(ImageDeleteJob, self).__init__(job_name, '', nodepool) diff --git a/nodepool/nodepool.py b/nodepool/nodepool.py index 8bf1d9290..cf32d8cf3 100644 --- a/nodepool/nodepool.py +++ b/nodepool/nodepool.py @@ -39,6 +39,7 @@ import provider_manager import stats import config as nodepool_config +import jobs MINS = 60 HOURS = 60 * MINS @@ -228,35 +229,6 @@ class NodeUpdateListener(threading.Thread): t.start() -class WatchableJob(gear.Job): - def __init__(self, *args, **kwargs): - super(WatchableJob, self).__init__(*args, **kwargs) - self._completion_handlers = [] - self._failure_handlers = [] - self._event = threading.Event() - - def addCompletionHandler(self, handler, *args, **kwargs): - self._completion_handlers.append((handler, args, kwargs)) - - def addFailureHandler(self, handler, *args, **kwargs): - self._failure_handlers.append((handler, args, kwargs)) - - def onCompleted(self): - for handler, args, kwargs in self._completion_handlers: - handler(self, *args, **kwargs) - - self._event.set() - - def onFailed(self): - for handler, args, kwargs in self._failure_handlers: - handler(self, *args, **kwargs) - - self._event.set() - - def waitForCompletion(self, timeout=None): - return self._event.wait(timeout) - - class JobTracker(object): def __init__(self): self._running_jobs = set() @@ -352,6 +324,14 @@ class GearmanClient(gear.Client): job = super(GearmanClient, self).handleWorkException(packet) job.onFailed() + def handleDisconnect(self, job): + super(GearmanClient, self).handleDisconnect(job) + job.onDisconnect() + + def handleWorkStatus(self, packet): + job = super(GearmanClient, self).handleWorkStatus(packet) + job.onWorkStatus() + class InstanceDeleter(threading.Thread): log = logging.getLogger("nodepool.InstanceDeleter") @@ -1100,7 +1080,6 @@ class NodePool(threading.Thread): self._instance_delete_threads = {} self._instance_delete_threads_lock = threading.Lock() self._image_build_jobs = JobTracker() - self._image_upload_jobs = JobTracker() def stop(self): self._stopped = True @@ -1115,8 +1094,8 @@ class NodePool(threading.Thread): def waitForBuiltImages(self): self.log.debug("Waiting for images to complete building.") - while len(self._image_build_jobs.running_jobs) > 0: - time.sleep(.5) + for job in self._image_build_jobs.running_jobs: + job.waitForCompletion() self.log.debug("Done waiting for images to complete building.") def loadConfig(self): @@ -1633,8 +1612,6 @@ class NodePool(threading.Thread): return t def buildImage(self, image): - gearman_job = None - # check if we already have this item in the queue with self.getDB().getSession() as session: queued_images = session.getBuildingDibImagesByName(image.name) @@ -1650,8 +1627,6 @@ class NodePool(threading.Thread): filename = os.path.join(self.config.imagesdir, '%s-%s' % (image.name, str(timestamp))) - job_uuid = str(uuid4().hex) - dib_image = session.createDibImage(image_name=image.name, filename=filename, version=timestamp) @@ -1659,17 +1634,9 @@ class NodePool(threading.Thread): dib_image.image_name, dib_image.state) # Submit image-build job - job_data = json.dumps({ - 'image-id': str(dib_image.id) - }) - gearman_job = WatchableJob( - 'image-build:%s' % image.name, job_data, job_uuid) + gearman_job = jobs.ImageBuildJob(image.name, dib_image.id, + self) self._image_build_jobs.addJob(gearman_job) - gearman_job.addCompletionHandler( - self.handleImageBuildComplete, image_id=dib_image.id) - gearman_job.addFailureHandler( - self.handleImageBuildFailed, image_id=dib_image.id) - self.gearman_client.submitJob(gearman_job, timeout=300) self.log.debug("Queued image building task for %s" % image.name) @@ -1698,18 +1665,10 @@ class NodePool(threading.Thread): session.commit() # Submit image-upload job - gearman_job = WatchableJob( - 'image-upload:%s' % image_id, - json.dumps({ - 'provider': provider, - 'image-name': image_name - }), - job_uuid) - self._image_upload_jobs.addJob(gearman_job) - gearman_job.addCompletionHandler(self.handleImageUploadComplete, - snap_image_id=snap_image.id) + gearman_job = jobs.ImageUploadJob(image_id, provider, image_name, + snap_image.id, self) self.log.debug('Submitting image-upload job uuid: %s' % - (job_uuid,)) + (gearman_job.unique,)) self.gearman_client.submitJob(gearman_job, timeout=300) return gearman_job @@ -1717,53 +1676,6 @@ class NodePool(threading.Thread): self.log.exception( "Could not upload image %s on %s", image_name, provider) - def handleImageUploadComplete(self, job, snap_image_id): - job_uuid = job.unique - job_data = json.loads(job.data[0]) - external_id = job_data['external-id'] - - with self.getDB().getSession() as session: - snap_image = session.getSnapshotImage(snap_image_id) - if snap_image is None: - self.log.error( - 'Unable to find matching snap_image for job_id %s', - job_uuid) - return - - snap_image.external_id = external_id - snap_image.state = nodedb.READY - session.commit() - self.log.debug('Image %s is ready with external_id %s', - snap_image_id, external_id) - - def handleImageBuildComplete(self, job, image_id): - with self.getDB().getSession() as session: - dib_image = session.getDibImage(image_id) - if dib_image is None: - self.log.error( - 'Unable to find matching dib_image for image_id %s', - image_id) - return - dib_image.state = nodedb.READY - session.commit() - self.log.debug('DIB Image %s (id %d) is ready', - job.name.split(':', 1)[0], image_id) - - def handleImageBuildFailed(self, job, image_id): - with self.getDB().getSession() as session: - self.log.debug('DIB Image %s (id %d) failed to build. Deleting.', - job.name.split(':', 1)[0], image_id) - dib_image = session.getDibImage(image_id) - dib_image.delete() - - def handleImageDeleteComplete(self, job, image_id): - with self.getDB().getSession() as session: - dib_image = session.getDibImage(image_id) - - # Remove image from the nodedb - dib_image.state = nodedb.DELETE - dib_image.delete() - def launchNode(self, session, provider, label, target): try: self._launchNode(session, provider, label, target) @@ -1948,14 +1860,9 @@ class NodePool(threading.Thread): def deleteDibImage(self, dib_image): try: # Submit image-delete job - job_uuid = str(uuid4().hex) - gearman_job = WatchableJob( - 'image-delete:%s' % dib_image.id, - '', job_uuid - ) - gearman_job.addCompletionHandler(self.handleImageDeleteComplete, - image_id=dib_image.id) + gearman_job = jobs.ImageDeleteJob(dib_image.id, self) self.gearman_client.submitJob(gearman_job, timeout=300) + dib_image.delete() return gearman_job except Exception: self.log.exception('Could not submit delete job for image id %s', diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index b340999e6..d6f40cbff 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -15,6 +15,7 @@ """Common utilities used in testing""" +import errno import logging import os import pymysql @@ -95,7 +96,14 @@ class GearmanServerFixture(fixtures.Fixture): self.addCleanup(self.shutdownGearman) def shutdownGearman(self): - self.gearman_server.shutdown() + #TODO:greghaynes remove try once gear client protects against this + try: + self.gearman_server.shutdown() + except OSError as e: + if e.errno == errno.EBADF: + pass + else: + raise class GearmanClient(gear.Client): diff --git a/nodepool/tests/test_nodepool.py b/nodepool/tests/test_nodepool.py index 3b74e54b6..abcb8e2cc 100644 --- a/nodepool/tests/test_nodepool.py +++ b/nodepool/tests/test_nodepool.py @@ -18,8 +18,8 @@ import threading import time import fixtures -import testtools +from nodepool import jobs from nodepool import tests from nodepool import nodedb import nodepool.fakeprovider @@ -486,9 +486,9 @@ class TestNodepool(tests.DBTestCase): self.assertEqual(len(nodes), 1) -class TestWatchableJob(testtools.TestCase): +class TestGearClient(tests.DBTestCase): def test_wait_for_completion(self): - wj = nodepool.nodepool.WatchableJob('test', 'test', 'test') + wj = jobs.WatchableJob('test', 'test', 'test') def call_on_completed(): time.sleep(.2) @@ -497,3 +497,24 @@ class TestWatchableJob(testtools.TestCase): t = threading.Thread(target=call_on_completed) t.start() wj.waitForCompletion() + + def test_handle_disconnect(self): + class MyJob(jobs.WatchableJob): + def __init__(self, *args, **kwargs): + super(MyJob, self).__init__(*args, **kwargs) + self.disconnect_called = False + + def onDisconnect(self): + super(MyJob, self).onDisconnect() + self.disconnect_called = True + + client = nodepool.nodepool.GearmanClient() + client.addServer('localhost', self.gearman_server.port) + client.waitForServer() + + job = MyJob('test-job', '', '') + client.submitJob(job) + + self.gearman_server.shutdown() + job.waitForCompletion() + self.assertEqual(job.disconnect_called, True)