Cleanup bulder jobs and add disconnect support

If nodepool disconnects from the gear server then all outstanding jobs
are removed. Before this change we would not detect the disconnect and
would wait forever for the outstanding jobs to complete.

Moved the job tracking in to job subclasses to make the state machine
more clear.

Change-Id: Ifaf6522ba54156a2915bab9416eefa7bd14fe70b
Depends-On: If5352a373d5fa61dd1ee661f4a37976b3447dd9d
This commit is contained in:
Gregory Haynes 2016-01-26 05:23:06 +00:00 committed by Clark Boylan
parent 4024a23786
commit 1fa3ee7779
4 changed files with 219 additions and 115 deletions

168
nodepool/jobs.py Normal file
View File

@ -0,0 +1,168 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import uuid
import threading
import gear
import nodedb
class WatchableJob(gear.Job):
def __init__(self, *args, **kwargs):
super(WatchableJob, self).__init__(*args, **kwargs)
self._completion_handlers = []
self._event = threading.Event()
def _handleCompletion(self, mode=None):
self._event.set()
for handler in self._completion_handlers:
handler(self)
def addCompletionHandler(self, handler):
self._completion_handlers.append(handler)
def onCompleted(self):
self._handleCompletion()
def onFailed(self):
self._handleCompletion()
def onDisconnect(self):
self._handleCompletion()
def onWorkStatus(self):
pass
def waitForCompletion(self, timeout=None):
return self._event.wait(timeout)
class NodepoolJob(WatchableJob):
def __init__(self, job_name, job_data_obj, nodepool):
job_uuid = str(uuid.uuid4().hex)
job_data = json.dumps(job_data_obj)
super(NodepoolJob, self).__init__(job_name, job_data, job_uuid)
self.nodepool = nodepool
def getDbSession(self):
return self.nodepool.getDB().getSession()
class ImageBuildJob(NodepoolJob):
log = logging.getLogger("jobs.ImageBuildJob")
def __init__(self, image_name, image_id, nodepool):
self.image_id = image_id
job_data = {'image-id': str(image_id)}
job_name = 'image-build:%s' % image_name
super(ImageBuildJob, self).__init__(job_name, job_data, nodepool)
def _deleteImage(self, record_only=False):
with self.getDbSession() as session:
self.log.debug('DIB Image %s (id %d) failed to build. Deleting.',
self.name.split(':', 1)[0], self.image_id)
dib_image = session.getDibImage(self.image_id)
# TODO:greghaynes remove this check and only call deleteDibImage
# when it is smart enough that it doesn't submit a delete job
if not record_only:
self.nodepool.deleteDibImage(dib_image)
else:
dib_image.delete()
def onCompleted(self):
try:
with self.getDbSession() as session:
dib_image = session.getDibImage(self.image_id)
if dib_image is None:
self.log.error(
'Unable to find matching dib_image for image_id %s',
self.image_id)
return
dib_image.state = nodedb.READY
session.commit()
self.log.debug('DIB Image %s (id %d) is ready',
self.name.split(':', 1)[0], self.image_id)
finally:
super(ImageBuildJob, self).onCompleted()
def onFailed(self):
try:
self.log.error('DIB Image %s (id %d) failed to build. Deleting.',
self.name.split(':', 1)[0], self.image_id)
self._deleteImage(True)
finally:
super(ImageBuildJob, self).onFailed()
def onDisconnect(self):
try:
self.log.error('DIB Image %s (id %d) failed due to gear disconnect.',
self.name.split(':', 1)[0], self.image_id)
self._deleteImage()
finally:
super(ImageBuildJob, self).onDisconnect()
class ImageUploadJob(NodepoolJob):
log = logging.getLogger("jobs.ImageUploadJob")
def __init__(self, image_id, provider_name, external_name, snap_image_id,
nodepool):
self.image_id = image_id
self.snap_image_id = snap_image_id
job_data = {
'image-name': external_name,
'provider': provider_name
}
job_name = 'image-upload:%s' % image_id
super(ImageUploadJob, self).__init__(job_name, job_data, nodepool)
def onCompleted(self):
try:
job_data = json.loads(self.data[0])
external_id = job_data['external-id']
with self.getDbSession() as session:
snap_image = session.getSnapshotImage(self.snap_image_id)
if snap_image is None:
self.log.error(
'Unable to find matching snap_image for job_id %s',
self.unique)
return
snap_image.external_id = external_id
snap_image.state = nodedb.READY
self.log.debug('Image %s is ready with external_id %s',
self.snap_image_id, external_id)
finally:
super(ImageUploadJob, self).onCompleted()
def onDisconnect(self):
try:
self.log.error('Image %s failed to upload due to gear disconnect.',
self.snap_image_id)
self.nodepool.deleteImage(self.snap_image_id)
finally:
super(ImageUploadJob, self).onDisconnect()
class ImageDeleteJob(NodepoolJob):
log = logging.getLogger("jobs.ImageDeleteJob")
def __init__(self, image_id, nodepool):
self.image_id = image_id
job_name = 'image-delete:%s' % image_id
super(ImageDeleteJob, self).__init__(job_name, '', nodepool)

View File

@ -39,6 +39,7 @@ import provider_manager
import stats
import config as nodepool_config
import jobs
MINS = 60
HOURS = 60 * MINS
@ -228,35 +229,6 @@ class NodeUpdateListener(threading.Thread):
t.start()
class WatchableJob(gear.Job):
def __init__(self, *args, **kwargs):
super(WatchableJob, self).__init__(*args, **kwargs)
self._completion_handlers = []
self._failure_handlers = []
self._event = threading.Event()
def addCompletionHandler(self, handler, *args, **kwargs):
self._completion_handlers.append((handler, args, kwargs))
def addFailureHandler(self, handler, *args, **kwargs):
self._failure_handlers.append((handler, args, kwargs))
def onCompleted(self):
for handler, args, kwargs in self._completion_handlers:
handler(self, *args, **kwargs)
self._event.set()
def onFailed(self):
for handler, args, kwargs in self._failure_handlers:
handler(self, *args, **kwargs)
self._event.set()
def waitForCompletion(self, timeout=None):
return self._event.wait(timeout)
class JobTracker(object):
def __init__(self):
self._running_jobs = set()
@ -352,6 +324,14 @@ class GearmanClient(gear.Client):
job = super(GearmanClient, self).handleWorkException(packet)
job.onFailed()
def handleDisconnect(self, job):
super(GearmanClient, self).handleDisconnect(job)
job.onDisconnect()
def handleWorkStatus(self, packet):
job = super(GearmanClient, self).handleWorkStatus(packet)
job.onWorkStatus()
class InstanceDeleter(threading.Thread):
log = logging.getLogger("nodepool.InstanceDeleter")
@ -1100,7 +1080,6 @@ class NodePool(threading.Thread):
self._instance_delete_threads = {}
self._instance_delete_threads_lock = threading.Lock()
self._image_build_jobs = JobTracker()
self._image_upload_jobs = JobTracker()
def stop(self):
self._stopped = True
@ -1115,8 +1094,8 @@ class NodePool(threading.Thread):
def waitForBuiltImages(self):
self.log.debug("Waiting for images to complete building.")
while len(self._image_build_jobs.running_jobs) > 0:
time.sleep(.5)
for job in self._image_build_jobs.running_jobs:
job.waitForCompletion()
self.log.debug("Done waiting for images to complete building.")
def loadConfig(self):
@ -1633,8 +1612,6 @@ class NodePool(threading.Thread):
return t
def buildImage(self, image):
gearman_job = None
# check if we already have this item in the queue
with self.getDB().getSession() as session:
queued_images = session.getBuildingDibImagesByName(image.name)
@ -1650,8 +1627,6 @@ class NodePool(threading.Thread):
filename = os.path.join(self.config.imagesdir,
'%s-%s' %
(image.name, str(timestamp)))
job_uuid = str(uuid4().hex)
dib_image = session.createDibImage(image_name=image.name,
filename=filename,
version=timestamp)
@ -1659,17 +1634,9 @@ class NodePool(threading.Thread):
dib_image.image_name, dib_image.state)
# Submit image-build job
job_data = json.dumps({
'image-id': str(dib_image.id)
})
gearman_job = WatchableJob(
'image-build:%s' % image.name, job_data, job_uuid)
gearman_job = jobs.ImageBuildJob(image.name, dib_image.id,
self)
self._image_build_jobs.addJob(gearman_job)
gearman_job.addCompletionHandler(
self.handleImageBuildComplete, image_id=dib_image.id)
gearman_job.addFailureHandler(
self.handleImageBuildFailed, image_id=dib_image.id)
self.gearman_client.submitJob(gearman_job, timeout=300)
self.log.debug("Queued image building task for %s" %
image.name)
@ -1698,18 +1665,10 @@ class NodePool(threading.Thread):
session.commit()
# Submit image-upload job
gearman_job = WatchableJob(
'image-upload:%s' % image_id,
json.dumps({
'provider': provider,
'image-name': image_name
}),
job_uuid)
self._image_upload_jobs.addJob(gearman_job)
gearman_job.addCompletionHandler(self.handleImageUploadComplete,
snap_image_id=snap_image.id)
gearman_job = jobs.ImageUploadJob(image_id, provider, image_name,
snap_image.id, self)
self.log.debug('Submitting image-upload job uuid: %s' %
(job_uuid,))
(gearman_job.unique,))
self.gearman_client.submitJob(gearman_job, timeout=300)
return gearman_job
@ -1717,53 +1676,6 @@ class NodePool(threading.Thread):
self.log.exception(
"Could not upload image %s on %s", image_name, provider)
def handleImageUploadComplete(self, job, snap_image_id):
job_uuid = job.unique
job_data = json.loads(job.data[0])
external_id = job_data['external-id']
with self.getDB().getSession() as session:
snap_image = session.getSnapshotImage(snap_image_id)
if snap_image is None:
self.log.error(
'Unable to find matching snap_image for job_id %s',
job_uuid)
return
snap_image.external_id = external_id
snap_image.state = nodedb.READY
session.commit()
self.log.debug('Image %s is ready with external_id %s',
snap_image_id, external_id)
def handleImageBuildComplete(self, job, image_id):
with self.getDB().getSession() as session:
dib_image = session.getDibImage(image_id)
if dib_image is None:
self.log.error(
'Unable to find matching dib_image for image_id %s',
image_id)
return
dib_image.state = nodedb.READY
session.commit()
self.log.debug('DIB Image %s (id %d) is ready',
job.name.split(':', 1)[0], image_id)
def handleImageBuildFailed(self, job, image_id):
with self.getDB().getSession() as session:
self.log.debug('DIB Image %s (id %d) failed to build. Deleting.',
job.name.split(':', 1)[0], image_id)
dib_image = session.getDibImage(image_id)
dib_image.delete()
def handleImageDeleteComplete(self, job, image_id):
with self.getDB().getSession() as session:
dib_image = session.getDibImage(image_id)
# Remove image from the nodedb
dib_image.state = nodedb.DELETE
dib_image.delete()
def launchNode(self, session, provider, label, target):
try:
self._launchNode(session, provider, label, target)
@ -1948,14 +1860,9 @@ class NodePool(threading.Thread):
def deleteDibImage(self, dib_image):
try:
# Submit image-delete job
job_uuid = str(uuid4().hex)
gearman_job = WatchableJob(
'image-delete:%s' % dib_image.id,
'', job_uuid
)
gearman_job.addCompletionHandler(self.handleImageDeleteComplete,
image_id=dib_image.id)
gearman_job = jobs.ImageDeleteJob(dib_image.id, self)
self.gearman_client.submitJob(gearman_job, timeout=300)
dib_image.delete()
return gearman_job
except Exception:
self.log.exception('Could not submit delete job for image id %s',

View File

@ -15,6 +15,7 @@
"""Common utilities used in testing"""
import errno
import logging
import os
import pymysql
@ -95,7 +96,14 @@ class GearmanServerFixture(fixtures.Fixture):
self.addCleanup(self.shutdownGearman)
def shutdownGearman(self):
self.gearman_server.shutdown()
#TODO:greghaynes remove try once gear client protects against this
try:
self.gearman_server.shutdown()
except OSError as e:
if e.errno == errno.EBADF:
pass
else:
raise
class GearmanClient(gear.Client):

View File

@ -18,8 +18,8 @@ import threading
import time
import fixtures
import testtools
from nodepool import jobs
from nodepool import tests
from nodepool import nodedb
import nodepool.fakeprovider
@ -486,9 +486,9 @@ class TestNodepool(tests.DBTestCase):
self.assertEqual(len(nodes), 1)
class TestWatchableJob(testtools.TestCase):
class TestGearClient(tests.DBTestCase):
def test_wait_for_completion(self):
wj = nodepool.nodepool.WatchableJob('test', 'test', 'test')
wj = jobs.WatchableJob('test', 'test', 'test')
def call_on_completed():
time.sleep(.2)
@ -497,3 +497,24 @@ class TestWatchableJob(testtools.TestCase):
t = threading.Thread(target=call_on_completed)
t.start()
wj.waitForCompletion()
def test_handle_disconnect(self):
class MyJob(jobs.WatchableJob):
def __init__(self, *args, **kwargs):
super(MyJob, self).__init__(*args, **kwargs)
self.disconnect_called = False
def onDisconnect(self):
super(MyJob, self).onDisconnect()
self.disconnect_called = True
client = nodepool.nodepool.GearmanClient()
client.addServer('localhost', self.gearman_server.port)
client.waitForServer()
job = MyJob('test-job', '', '')
client.submitJob(job)
self.gearman_server.shutdown()
job.waitForCompletion()
self.assertEqual(job.disconnect_called, True)