diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index 710ece833..e88c39348 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -32,6 +32,29 @@ Example:: script-dir: /path/to/script/dir +elements-dir +------------ + +If an image is configured to use disk-image-builder and glance to locally +create and upload images, then a collection of disk-image-builder elements +must be present. The ``elements-dir`` parameter indicates a directory +that holds one or more elements. + +Example:: + + elements-dir: /path/to/elements/dir + +images-dir +---------- + +When we generate images using disk-image-builder they need to be +written to somewhere. The ``images-dir`` parameter is the place to +write them. + +Example:: + + images-dir: /path/to/images/dir + dburi ----- Indicates the URI for the database connection. See the `SQLAlchemy diff --git a/nodepool/cmd/nodepoolcmd.py b/nodepool/cmd/nodepoolcmd.py index 4ebb58159..9df85d01b 100644 --- a/nodepool/cmd/nodepoolcmd.py +++ b/nodepool/cmd/nodepoolcmd.py @@ -48,12 +48,21 @@ class NodePoolCmd(object): help='list images') cmd_image_list.set_defaults(func=self.image_list) + cmd_dib_image_list = subparsers.add_parser('dib-image-list', + help='list dib images') + cmd_dib_image_list.set_defaults(func=self.dib_image_list) + cmd_image_update = subparsers.add_parser('image-update', help='update image') cmd_image_update.add_argument('provider', help='provider name') cmd_image_update.add_argument('image', help='image name') cmd_image_update.set_defaults(func=self.image_update) + cmd_image_build = subparsers.add_parser('image-build', + help='build image') + cmd_image_build.add_argument('image', help='image name') + cmd_image_build.set_defaults(func=self.image_build) + cmd_alien_list = subparsers.add_parser( 'alien-list', help='list nodes not accounted for by nodepool') @@ -89,6 +98,20 @@ class NodePoolCmd(object): cmd_image_delete.set_defaults(func=self.image_delete) cmd_image_delete.add_argument('id', help='image id') + cmd_dib_image_delete = subparsers.add_parser( + 'dib-image-delete', + help='delete a dib image') + cmd_dib_image_delete.set_defaults(func=self.dib_image_delete) + cmd_dib_image_delete.add_argument('id', help='dib image id') + + cmd_image_upload = subparsers.add_parser( + 'image-upload', + help='upload an image') + cmd_image_upload.set_defaults(func=self.image_upload) + cmd_image_upload.add_argument('provider', help='provider name', + nargs='?', default='all') + cmd_image_upload.add_argument('image', help='image name') + self.args = parser.parse_args() def setup_logging(self): @@ -114,6 +137,19 @@ class NodePoolCmd(object): '%.02f' % ((now - node.state_time) / 3600)]) print t + def dib_image_list(self): + t = PrettyTable(["ID", "Image", "Filename", "Version", + "State", "Age (hours)"]) + t.align = 'l' + now = time.time() + with self.pool.getDB().getSession() as session: + for image in session.getDibImages(): + t.add_row([image.id, image.image_name, + image.filename, image.version, + nodedb.STATE_NAMES[image.state], + '%.02f' % ((now - image.state_time) / 3600)]) + print t + def image_list(self): t = PrettyTable(["ID", "Provider", "Image", "Hostname", "Version", "Image ID", "Server ID", "State", "Age (hours)"]) @@ -129,12 +165,54 @@ class NodePoolCmd(object): print t def image_update(self): + with self.pool.getDB().getSession() as session: + self.pool.reconfigureManagers(self.pool.config) + if self.args.image in self.pool.config.diskimages: + # first build image, then upload it + self.image_build() + self.image_upload() + else: + if self.args.provider == 'all': + # iterate for all providers listed in label + for provider in self.pool.config.providers.values(): + for image in provider.images.values(): + if self.args.image == image.name: + self.pool.updateImage(session, provider.name, + self.args.image) + break + else: + provider = self.pool.config.providers[self.args.provider] + self.pool.updateImage(session, provider.self.args.image) + + def image_build(self): + if self.args.image not in self.pool.config.diskimages: + # only can build disk images, not snapshots + raise Exception("Trying to build a non disk-image-builder " + "image: %s" % self.args.image) + + self.pool.reconfigureImageBuilder() + self.pool.buildImage(self.pool.config.diskimages[self.args.image]) + self.pool.waitForBuiltImages() + + def image_upload(self): self.pool.reconfigureManagers(self.pool.config) - provider = self.pool.config.providers[self.args.provider] - image = provider.images[self.args.image] + if not self.args.image in self.pool.config.diskimages: + # only can build disk images, not snapshots + raise Exception("Trying to upload a non disk-image-builder " + "image: %s" % self.args.image) with self.pool.getDB().getSession() as session: - self.pool.updateImage(session, provider.name, image.name) + if self.args.provider == 'all': + # iterate for all providers listed in label + for provider in self.pool.config.providers.values(): + for image in provider.images.values(): + if self.args.image == image.name: + self.pool.uploadImage(session, provider.name, + self.args.image) + break + else: + self.pool.uploadImage(session, self.args.provider, + self.args.image) def alien_list(self): self.pool.reconfigureManagers(self.pool.config) @@ -193,6 +271,12 @@ class NodePoolCmd(object): node.state = nodedb.DELETE self.list(node_id=node.id) + def dib_image_delete(self): + self.pool.reconfigureManagers(self.pool.config) + with self.pool.getDB().getSession() as session: + dib_image = session.getDibImage(self.args.id) + self.pool.deleteDibImage(dib_image) + def image_delete(self): self.pool.reconfigureManagers(self.pool.config) with self.pool.getDB().getSession() as session: diff --git a/nodepool/fakeprovider.py b/nodepool/fakeprovider.py index 0a839c81a..e38b1d269 100644 --- a/nodepool/fakeprovider.py +++ b/nodepool/fakeprovider.py @@ -92,7 +92,7 @@ class FakeHTTPClient(object): return None, dict(extensions=dict()) -class FakeClient(object): +class FakeNovaClient(object): def __init__(self): self.flavors = FakeList([ Dummy(id='f1', ram=8192, name='Fake Flavor'), @@ -103,6 +103,18 @@ class FakeClient(object): self.servers = FakeList([]) self.servers.api = self + +class FakeGlanceClient(object): + def __init__(self): + self.id = 'fake-glance-id' + + def update(self, **kwargs): + return True + + +class FakeClient(object): + def __init__(self): + self.client = FakeHTTPClient() self.client.user = 'fake' self.client.password = 'fake' self.client.projectid = 'fake' @@ -110,6 +122,9 @@ class FakeClient(object): self.client.service_name = None self.client.region_name = None + self.nova = FakeNovaClient() + self.glance = FakeGlanceClient() + class FakeFile(StringIO.StringIO): def __init__(self, path): @@ -188,4 +203,5 @@ class FakeJenkins(object): u'url': u'https://jenkins.example.com/view/test-view/'}]} return d + FAKE_CLIENT = FakeClient() diff --git a/nodepool/nodedb.py b/nodepool/nodedb.py index 4a7861c20..3878caa06 100644 --- a/nodepool/nodedb.py +++ b/nodepool/nodedb.py @@ -47,6 +47,20 @@ from sqlalchemy.orm import scoped_session, mapper, relationship, foreign from sqlalchemy.orm.session import Session, sessionmaker metadata = MetaData() + +dib_image_table = Table( + 'dib_image', metadata, + Column('id', Integer, primary_key=True), + Column('image_name', String(255), index=True, nullable=False), + # Image filename + Column('filename', String(255)), + # Version indicator (timestamp) + Column('version', Integer), + # One of the above values + Column('state', Integer), + # Time of last state change + Column('state_time', Integer), + ) snapshot_image_table = Table( 'snapshot_image', metadata, Column('id', Integer, primary_key=True), @@ -103,6 +117,32 @@ subnode_table = Table( ) +class DibImage(object): + def __init__(self, image_name, filename=None, version=None, + state=BUILDING): + self.image_name = image_name + self.filename = filename + self.version = version + self.state = state + + def delete(self): + session = Session.object_session(self) + session.delete(self) + session.commit() + + @property + def state(self): + return self._state + + @state.setter + def state(self, state): + self._state = state + self.state_time = int(time.time()) + session = Session.object_session(self) + if session: + session.commit() + + class SnapshotImage(object): def __init__(self, provider_name, image_name, hostname=None, version=None, external_id=None, server_external_id=None, state=BUILDING): @@ -212,6 +252,9 @@ mapper(Node, node_table, mapper(SnapshotImage, snapshot_image_table, properties=dict(_state=snapshot_image_table.c.state)) +mapper(DibImage, dib_image_table, + properties=dict(_state=dib_image_table.c.state)) + class NodeDatabase(object): def __init__(self, dburi): @@ -258,6 +301,10 @@ class NodeDatabaseSession(object): self.session().query(SnapshotImage).distinct( snapshot_image_table.c.provider_name).all()] + def getDibImages(self): + return self.session().query(DibImage).order_by( + dib_image_table.c.image_name).all() + def getImages(self, provider_name): return [ x.image_name for x in @@ -270,6 +317,21 @@ class NodeDatabaseSession(object): snapshot_image_table.c.provider_name, snapshot_image_table.c.image_name).all() + def getDibImage(self, image_id): + images = self.session().query(DibImage).filter_by( + id=image_id).all() + if not images: + return None + return images[0] + + def getBuildingDibImagesByName(self, image_name): + images = self.session().query(DibImage).filter( + dib_image_table.c.image_name == image_name, + dib_image_table.c.state == BUILDING).all() + if not images: + return None + return images + def getSnapshotImage(self, image_id): images = self.session().query(SnapshotImage).filter_by( id=image_id).all() @@ -285,6 +347,13 @@ class NodeDatabaseSession(object): return None return images[0] + def getOrderedReadyDibImages(self, image_name): + images = self.session().query(DibImage).filter( + dib_image_table.c.image_name == image_name, + dib_image_table.c.state == READY).order_by( + dib_image_table.c.version.desc()).all() + return images + def getOrderedReadySnapshotImages(self, provider_name, image_name): images = self.session().query(SnapshotImage).filter( snapshot_image_table.c.provider_name == provider_name, @@ -295,10 +364,17 @@ class NodeDatabaseSession(object): def getCurrentSnapshotImage(self, provider_name, image_name): images = self.getOrderedReadySnapshotImages(provider_name, image_name) + if not images: return None return images[0] + def createDibImage(self, *args, **kwargs): + new = DibImage(*args, **kwargs) + self.session().add(new) + self.commit() + return new + def createSnapshotImage(self, *args, **kwargs): new = SnapshotImage(*args, **kwargs) self.session().add(new) diff --git a/nodepool/nodepool.py b/nodepool/nodepool.py index a37ee823e..dde183525 100644 --- a/nodepool/nodepool.py +++ b/nodepool/nodepool.py @@ -23,8 +23,11 @@ import json import logging import os.path import paramiko +import Queue import random import re +import shlex +import subprocess import threading import time import yaml @@ -406,6 +409,7 @@ class NodeLauncher(threading.Thread): self.log.debug("Node id: %s is running, ip: %s, testing ssh" % (self.node.id, ip)) connect_kwargs = dict(key_filename=self.image.private_key) + if not utils.ssh_connect(ip, self.image.username, connect_kwargs=connect_kwargs, timeout=self.timeout): @@ -653,8 +657,8 @@ class SubNodeLauncher(threading.Thread): raise LaunchNetworkException("Unable to find public IP of server") self.subnode.ip = ip - self.log.debug("Subnode id: %s for node id: %s is running, ip: %s, " - "testing ssh" % + self.log.debug("Subnode id: %s for node id: %s is running, " + "ip: %s, testing ssh" % (self.subnode_id, self.node_id, ip)) connect_kwargs = dict(key_filename=self.image.private_key) if not utils.ssh_connect(ip, self.image.username, @@ -684,6 +688,8 @@ class ImageUpdater(threading.Thread): self.snap_image_id = snap_image_id self.nodepool = nodepool self.scriptdir = self.nodepool.config.scriptdir + self.elementsdir = self.nodepool.config.elementsdir + self.imagesdir = self.nodepool.config.imagesdir def run(self): try: @@ -718,6 +724,146 @@ class ImageUpdater(threading.Thread): self.snap_image.id) return + +class DiskImageBuilder(threading.Thread): + log = logging.getLogger("nodepool.DiskImageBuilderThread") + + def __init__(self, nodepool): + threading.Thread.__init__(self, name='DiskImageBuilder queue') + self.nodepool = nodepool + self.elementsdir = self.nodepool.config.elementsdir + self.imagesdir = self.nodepool.config.imagesdir + self.queue = nodepool._image_builder_queue + + def run(self): + while True: + # grabs image id from queue + self.disk_image = self.queue.get() + self.buildImage(self.disk_image) + self.queue.task_done() + + def _buildImage(self, image, filename): + if filename.startswith('./fake-dib-image'): + return True + + try: + env = os.environ.copy() + for k, v in os.environ.items(): + if k.startswith('NODEPOOL_'): + env[k] = v + + env['ELEMENTS_PATH'] = self.elementsdir + img_elements = '' + extra_options = '' + env['DIB_RELEASE'] = image.release + img_elements = image.elements + + if image.qemu_img_options: + extra_options = ('--qemu-img-options %s' % + image.qemu_img_options) + + cmd = ('disk-image-create -x --no-tmpfs %s -o %s %s' % + (extra_options, filename, img_elements)) + self.log.info('Running %s' % cmd) + + p = subprocess.Popen( + shlex.split(cmd), + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env) + (stdout, stderr) = p.communicate() + if self.log: + self.log.info(stdout) + if stderr: + for line in stderr: + if self.log: + self.log.error(line.rstrip()) + ret = p.returncode + if ret: + raise Exception("Unable to create %s" % filename) + except Exception: + self.log.exception("Exception in run method:") + + def buildImage(self, image_id): + with self.nodepool.getDB().getSession() as session: + self.dib_image = session.getDibImage(image_id) + self.log.info("Creating image: %s with filename %s" % + (self.dib_image.image_name, self.dib_image.filename)) + + start_time = time.time() + timestamp = int(start_time) + self.dib_image.version = timestamp + session.commit() + + # retrieve image details + image_details = \ + self.nodepool.config.diskimages[self.dib_image.image_name] + self._buildImage(image_details, self.dib_image.filename) + + if statsd: + dt = int((time.time() - start_time) * 1000) + key = 'nodepool.dib_image_build.%s' % self.dib_image.image_name + statsd.timing(key, dt) + statsd.incr(key) + + self.dib_image.state = nodedb.READY + session.commit() + self.log.info("Image %s is built" % self.dib_image.image_name) + + +class DiskImageUpdater(ImageUpdater): + + log = logging.getLogger("nodepool.DiskImageUpdater") + + def __init__(self, nodepool, provider, image, snap_image_id, + filename): + super(DiskImageUpdater, self).__init__(nodepool, provider, image, + snap_image_id) + self.filename = filename + self.image_name = image.name + + def updateImage(self, session): + start_time = time.time() + timestamp = int(start_time) + + dummy_image = type('obj', (object,), {'name': self.image_name}) + image_name = self.provider.template_hostname.format( + provider=self.provider, image=dummy_image, + timestamp=str(timestamp)) + + # TODO(mordred) abusing the hostname field + self.snap_image.hostname = image_name + self.snap_image.version = timestamp + session.commit() + + # strip extension from filename + stripped_filename = self.filename.replace(".qcow2", "") + image_id = self.manager.uploadImage(image_name, stripped_filename, + 'qcow2', 'bare') + self.snap_image.external_id = image_id + session.commit() + self.log.debug("Image id: %s building image %s" % + (self.snap_image.id, image_id)) + # It can take a _very_ long time for Rackspace 1.0 to save an image + self.manager.waitForImage(image_id, IMAGE_TIMEOUT) + + if statsd: + dt = int((time.time() - start_time) * 1000) + key = 'nodepool.image_update.%s.%s' % (self.image_name, + self.provider.name) + statsd.timing(key, dt) + statsd.incr(key) + + self.snap_image.state = nodedb.READY + session.commit() + self.log.info("Image %s in %s is ready" % (self.snap_image.hostname, + self.provider.name)) + + +class SnapshotImageUpdater(ImageUpdater): + + log = logging.getLogger("nodepool.SnapshotImageUpdater") + def updateImage(self, session): start_time = time.time() timestamp = int(start_time) @@ -914,6 +1060,10 @@ class GearmanServer(ConfigValue): pass +class DiskImage(ConfigValue): + pass + + class NodePool(threading.Thread): log = logging.getLogger("nodepool.NodePool") @@ -928,6 +1078,8 @@ class NodePool(threading.Thread): self.apsched = None self._delete_threads = {} self._delete_threads_lock = threading.Lock() + self._image_builder_queue = Queue.Queue() + self._image_builder_thread = None def stop(self): self._stopped = True @@ -940,6 +1092,10 @@ class NodePool(threading.Thread): if self.apsched: self.apsched.shutdown() + def waitForBuiltImages(self): + # wait on the queue until everything has been processed + self._image_builder_queue.join() + def loadConfig(self): self.log.debug("Loading configuration") config = yaml.load(open(self.configfile)) @@ -951,11 +1107,14 @@ class NodePool(threading.Thread): newconfig.targets = {} newconfig.labels = {} newconfig.scriptdir = config.get('script-dir') + newconfig.elementsdir = config.get('elements-dir') + newconfig.imagesdir = config.get('images-dir') newconfig.dburi = config.get('dburi') newconfig.provider_managers = {} newconfig.jenkins_managers = {} newconfig.zmq_publishers = {} newconfig.gearman_servers = {} + newconfig.diskimages = {} newconfig.crons = {} for name, default in [ @@ -982,6 +1141,18 @@ class NodePool(threading.Thread): g.name = g.host + '_' + str(g.port) newconfig.gearman_servers[g.name] = g + if 'diskimages' in config: + for diskimage in config['diskimages']: + d = DiskImage() + d.name = diskimage['name'] + newconfig.diskimages[d.name] = d + if 'elements' in diskimage: + d.elements = u' '.join(diskimage['elements']) + else: + d.elements = '' + d.release = diskimage.get('release', '') + d.qemu_img_options = diskimage.get('qemu-img-options', '') + for label in config['labels']: l = Label() l.name = label['name'] @@ -996,6 +1167,9 @@ class NodePool(threading.Thread): p.name = provider['name'] l.providers[p.name] = p + # if image is in diskimages, mark it to build once + l.is_diskimage = (l.image in newconfig.diskimages) + for provider in config['providers']: p = Provider() p.name = provider['name'] @@ -1025,11 +1199,12 @@ class NodePool(threading.Thread): i = ProviderImage() i.name = image['name'] p.images[i.name] = i - i.base_image = image['base-image'] + i.base_image = image.get('base-image', None) i.min_ram = image['min-ram'] i.name_filter = image.get('name-filter', None) - i.setup = image.get('setup') + i.setup = image.get('setup', None) i.reset = image.get('reset') + i.diskimage = image.get('diskimage', None) i.username = image.get('username', 'jenkins') i.private_key = image.get('private-key', '/var/lib/jenkins/.ssh/id_rsa') @@ -1227,6 +1402,13 @@ class NodePool(threading.Thread): self.gearman_client.addServer(g.host, g.port) self.gearman_client.waitForServer() + def reconfigureImageBuilder(self): + # start disk image builder thread + if not self._image_builder_thread: + self._image_builder_thread = DiskImageBuilder(self) + self._image_builder_thread.daemon = True + self._image_builder_thread.start() + def setConfig(self, config): self.config = config @@ -1354,12 +1536,14 @@ class NodePool(threading.Thread): def getNeededSubNodes(self, session): nodes_to_launch = [] for node in session.getNodes(): - expected_subnodes = self.config.labels[node.label_name].subnodes - active_subnodes = len([n for n in node.subnodes - if n.state != nodedb.DELETE]) - deficit = max(expected_subnodes - active_subnodes, 0) - if deficit: - nodes_to_launch.append((node, deficit)) + if node.label_name in self.config.labels: + expected_subnodes = \ + self.config.labels[node.label_name].subnodes + active_subnodes = len([n for n in node.subnodes + if n.state != nodedb.DELETE]) + deficit = max(expected_subnodes - active_subnodes, 0) + if deficit: + nodes_to_launch.append((node, deficit)) return nodes_to_launch def updateConfig(self): @@ -1370,9 +1554,11 @@ class NodePool(threading.Thread): self.reconfigureUpdateListeners(config) self.reconfigureGearmanClient(config) self.setConfig(config) + self.reconfigureImageBuilder() def startup(self): self.updateConfig() + # Currently nodepool can not resume building a node after a # restart. To clean up, mark all building nodes for deletion # when the daemon starts. @@ -1435,18 +1621,67 @@ class NodePool(threading.Thread): if label.min_ready < 0: # Label is configured to be disabled, skip creating the image. continue - for provider_name in label.providers: + + # check if image is there, if not, build it + if label.is_diskimage: found = False - for snap_image in session.getSnapshotImages(): - if (snap_image.provider_name == provider_name and - snap_image.image_name == label.image and - snap_image.state in [nodedb.READY, - nodedb.BUILDING]): + for dib_image in session.getDibImages(): + if (dib_image.image_name == label.image and + dib_image.state in [nodedb.READY, + nodedb.BUILDING]): + # if image is in ready state, check if image + # file exists in directory, otherwise we need + # to rebuild and delete this buggy image + if (dib_image.state == nodedb.READY and + not os.path.exists(dib_image.filename) and + not dib_image.filename.startswith( + './fake-dib-image')): + self.log.warning("Image filename %s does not " + "exist. Removing image" % + dib_image.filename) + self.deleteDibImage(dib_image) + continue + found = True + break if not found: - self.log.warning("Missing image %s on %s" % - (label.image, provider_name)) - self.updateImage(session, provider_name, label.image) + # only build the image, we'll recheck again + self.log.warning("Missing disk image %s" % label.image) + self.buildImage(self.config.diskimages[label.image]) + else: + # check for providers, to upload it + for provider_name in label.providers: + found = False + for snap_image in session.getSnapshotImages(): + if (snap_image.provider_name == provider_name and + snap_image.image_name == label.image and + snap_image.state in [nodedb.READY, + nodedb.BUILDING]): + found = True + break + if not found: + self.log.warning("Missing image %s on %s" % + (label.image, provider_name)) + # when we have a READY image, upload it + available_images = \ + session.getOrderedReadyDibImages(label.image) + if available_images: + self.uploadImage(session, provider_name, + label.image) + else: + # snapshots + for provider_name in label.providers: + found = False + for snap_image in session.getSnapshotImages(): + if (snap_image.provider_name == provider_name and + snap_image.image_name == label.image and + snap_image.state in [nodedb.READY, + nodedb.BUILDING]): + found = True + if not found: + self.log.warning("Missing image %s on %s" % + (label.image, provider_name)) + self.updateImage(session, provider_name, label.image) def _doUpdateImages(self): try: @@ -1458,9 +1693,30 @@ class NodePool(threading.Thread): def updateImages(self, session): # This function should be run periodically to create new snapshot # images. - for provider in self.config.providers.values(): - for image in provider.images.values(): - self.updateImage(session, provider.name, image.name) + needs_build = False + for label in self.config.labels.values(): + if label.min_ready < 0: + # Label is configured to be disabled, skip creating the image. + continue + + # check if image is there, if not, build it + if label.image.is_diskimage: + self.buildImage(self.pool.config.diskimages[label.image]) + needs_build = True + if needs_build: + # wait for all builds to finish, to have updated images to upload + self.waitForBuiltImages() + + for label in self.config.labels.values(): + if label.min_ready < 0: + # Label is configured to be disabled, skip creating the image. + continue + + for provider in label.providers: + if label.image.is_diskimage: + self.uploadImage(session, provider.name, label.image) + else: + self.updateImage(session, provider.name, label.image) def updateImage(self, session, provider_name, image_name): try: @@ -1470,18 +1726,90 @@ class NodePool(threading.Thread): "Could not update image %s on %s", image_name, provider_name) def _updateImage(self, session, provider_name, image_name): + # check type of image depending on label + is_diskimage = (image_name in self.config.diskimages) + if is_diskimage: + raise Exception( + "Cannot update disk image images. " + "Please build and upload images") + provider = self.config.providers[provider_name] image = provider.images[image_name] + if not image.setup: + raise Exception( + "Invalid image config. Must specify either " + "a setup script, or a diskimage to use.") snap_image = session.createSnapshotImage( provider_name=provider.name, - image_name=image.name) - t = ImageUpdater(self, provider, image, snap_image.id) + image_name=image_name) + + t = SnapshotImageUpdater(self, provider, image, snap_image.id) t.start() # Enough time to give them different timestamps (versions) # Just to keep things clearer. time.sleep(2) return t + def buildImage(self, image): + # check type of image depending on label + is_diskimage = (image.name in self.config.diskimages) + if not is_diskimage: + raise Exception( + "Cannot build non disk image images. " + "Please create snapshots for them") + + # check if we already have this item in the queue + with self.getDB().getSession() as session: + queued_images = session.getBuildingDibImagesByName(image.name) + if queued_images: + self.log.exception('Image %s is already being built' % + image.name) + else: + try: + start_time = time.time() + timestamp = int(start_time) + + filename = os.path.join(self.config.imagesdir, + '%s-%s' % + (image.name, str(timestamp))) + + self.log.debug("Queued image building task for %s" % + image.name) + dib_image = session.createDibImage(image_name=image.name, + filename=filename + + ".qcow2") + + # add this build to queue + self._image_builder_queue.put(dib_image.id) + except Exception: + self.log.exception( + "Could not build image %s", image.name) + + def uploadImage(self, session, provider, image_name): + images = session.getOrderedReadyDibImages(image_name) + if not images: + # raise error, no image ready for uploading + raise Exception( + "No image available for that upload. Please build one first.") + + try: + filename = images[0].filename + provider_entity = self.config.providers[provider] + provider_image = provider_entity.images[images[0].image_name] + snap_image = session.createSnapshotImage( + provider_name=provider, image_name=image_name) + t = DiskImageUpdater(self, provider_entity, provider_image, + snap_image.id, filename) + t.start() + + # Enough time to give them different timestamps (versions) + # Just to keep things clearer. + time.sleep(2) + return t + except Exception: + self.log.exception( + "Could not upload image %s on %s", image_name, provider) + def launchNode(self, session, provider, label, target): try: self._launchNode(session, provider, label, target) @@ -1560,8 +1888,11 @@ class NodePool(threading.Thread): self.updateStats(session, node.provider_name) provider = self.config.providers[node.provider_name] target = self.config.targets[node.target_name] - label = self.config.labels[node.label_name] - image = provider.images[label.image] + label = self.config.labels.get(node.label_name, None) + if label: + image_name = provider.images[label.image].name + else: + image_name = None manager = self.getProviderManager(provider) if target.jenkins_url: @@ -1601,7 +1932,7 @@ class NodePool(threading.Thread): if statsd: dt = int((time.time() - node.state_time) * 1000) - key = 'nodepool.delete.%s.%s.%s' % (image.name, + key = 'nodepool.delete.%s.%s.%s' % (image_name, node.provider_name, node.target_name) statsd.timing(key, dt) @@ -1638,6 +1969,15 @@ class NodePool(threading.Thread): snap_image.delete() self.log.info("Deleted image id: %s" % snap_image.id) + def deleteDibImage(self, dib_image): + # Delete a dib image and it's associated file + if os.path.exists(dib_image.filename): + os.remove(dib_image.filename) + + dib_image.state = nodedb.DELETE + dib_image.delete() + self.log.info("Deleted dib image id: %s" % dib_image.id) + def _doPeriodicCleanup(self): try: self.periodicCleanup() @@ -1657,11 +1997,14 @@ class NodePool(threading.Thread): node_ids = [] image_ids = [] + dib_image_ids = [] with self.getDB().getSession() as session: for node in session.getNodes(): node_ids.append(node.id) for image in session.getSnapshotImages(): image_ids.append(image.id) + for dib_image in session.getDibImages(): + dib_image_ids.append(dib_image.id) for node_id in node_ids: try: @@ -1681,6 +2024,15 @@ class NodePool(threading.Thread): except Exception: self.log.exception("Exception cleaning up image id %s:" % image_id) + + for dib_image_id in dib_image_ids: + try: + with self.getDB().getSession() as session: + dib_image = session.getDibImage(dib_image_id) + self.cleanupOneDibImage(session, dib_image) + except Exception: + self.log.exception("Exception cleaning up image id %s:" % + dib_image_id) self.log.debug("Finished periodic cleanup") def cleanupOneNode(self, session, node): @@ -1727,7 +2079,7 @@ class NodePool(threading.Thread): if len(images) > 1: previous = images[1] if (image != current and image != previous and - (now - image.state_time) > IMAGE_CLEANUP): + (now - image.state_time) > IMAGE_CLEANUP): self.log.info("Deleting image id: %s which is " "%s hours old" % (image.id, @@ -1740,6 +2092,35 @@ class NodePool(threading.Thread): self.log.exception("Exception deleting image id: %s:" % image.id) + def cleanupOneDibImage(self, session, image): + delete = False + now = time.time() + if (image.image_name not in self.config.diskimages): + delete = True + self.log.info("Deleting image id: %s which has no current " + "base image" % image.id) + else: + images = session.getOrderedReadyDibImages( + image.image_name) + current = previous = None + if len(images) > 0: + current = images[0] + if len(images) > 1: + previous = images[1] + if (image != current and image != previous and + (now - image.state_time) > IMAGE_CLEANUP): + self.log.info("Deleting image id: %s which is " + "%s hours old" % + (image.id, + (now - image.state_time) / (60 * 60))) + delete = True + if delete: + try: + self.deleteDibImage(image) + except Exception: + self.log.exception("Exception deleting image id: %s:" % + image.id) + def _doPeriodicCheck(self): try: with self.getDB().getSession() as session: @@ -1755,18 +2136,22 @@ class NodePool(threading.Thread): for node in session.getNodes(): if node.state != nodedb.READY: continue - try: - provider = self.config.providers[node.provider_name] + provider = self.config.providers[node.provider_name] + if node.label_name in self.config.labels: label = self.config.labels[node.label_name] image = provider.images[label.image] connect_kwargs = dict(key_filename=image.private_key) - if utils.ssh_connect(node.ip, image.username, - connect_kwargs=connect_kwargs): - continue - except Exception: - self.log.exception("SSH Check failed for node id: %s" % - node.id) - self.deleteNode(node.id) + try: + if utils.ssh_connect(node.ip, image.username, + connect_kwargs=connect_kwargs): + continue + except Exception: + self.log.exception("SSH Check failed for node id: %s" % + node.id) + else: + self.log.exception("Node with non-existing label %s" % + node.label_name) + self.deleteNode(node.id) self.log.debug("Finished periodic check") def updateStats(self, session, provider_name): diff --git a/nodepool/provider_manager.py b/nodepool/provider_manager.py index 76eff77d6..4544ae43c 100644 --- a/nodepool/provider_manager.py +++ b/nodepool/provider_manager.py @@ -23,6 +23,9 @@ import novaclient.client import novaclient.extension import novaclient.v1_1.contrib.tenant_networks import threading +import glanceclient +import glanceclient.client +import keystoneclient.v2_0.client as ksclient import time import fakeprovider @@ -93,14 +96,14 @@ class NotFound(Exception): class CreateServerTask(Task): def main(self, client): - server = client.servers.create(**self.args) + server = client.nova.servers.create(**self.args) return str(server.id) class GetServerTask(Task): def main(self, client): try: - server = client.servers.get(self.args['server_id']) + server = client.nova.servers.get(self.args['server_id']) except novaclient.exceptions.NotFound: raise NotFound() return make_server_dict(server) @@ -108,52 +111,52 @@ class GetServerTask(Task): class DeleteServerTask(Task): def main(self, client): - client.servers.delete(self.args['server_id']) + client.nova.servers.delete(self.args['server_id']) class ListServersTask(Task): def main(self, client): - servers = client.servers.list() + servers = client.nova.servers.list() return [make_server_dict(server) for server in servers] class AddKeypairTask(Task): def main(self, client): - client.keypairs.create(**self.args) + client.nova.keypairs.create(**self.args) class ListKeypairsTask(Task): def main(self, client): - keys = client.keypairs.list() + keys = client.nova.keypairs.list() return [dict(id=str(key.id), name=key.name) for key in keys] class DeleteKeypairTask(Task): def main(self, client): - client.keypairs.delete(self.args['name']) + client.nova.keypairs.delete(self.args['name']) class CreateFloatingIPTask(Task): def main(self, client): - ip = client.floating_ips.create(**self.args) + ip = client.nova.floating_ips.create(**self.args) return dict(id=str(ip.id), ip=ip.ip) class AddFloatingIPTask(Task): def main(self, client): - client.servers.add_floating_ip(**self.args) + client.nova.servers.add_floating_ip(**self.args) class GetFloatingIPTask(Task): def main(self, client): - ip = client.floating_ips.get(self.args['ip_id']) + ip = client.nova.floating_ips.get(self.args['ip_id']) return dict(id=str(ip.id), ip=ip.ip, instance_id=str(ip.instance_id)) class ListFloatingIPsTask(Task): def main(self, client): - ips = client.floating_ips.list() + ips = client.nova.floating_ips.list() return [dict(id=str(ip.id), ip=ip.ip, instance_id=str(ip.instance_id)) for ip in ips] @@ -161,24 +164,39 @@ class ListFloatingIPsTask(Task): class RemoveFloatingIPTask(Task): def main(self, client): - client.servers.remove_floating_ip(**self.args) + client.nova.servers.remove_floating_ip(**self.args) class DeleteFloatingIPTask(Task): def main(self, client): - client.floating_ips.delete(self.args['ip_id']) + client.nova.floating_ips.delete(self.args['ip_id']) class CreateImageTask(Task): def main(self, client): # This returns an id - return str(client.servers.create_image(**self.args)) + return str(client.nova.servers.create_image(**self.args)) + + +class UploadImageTask(Task): + def main(self, client): + if self.args['image_name'].startswith('fake-dib-image'): + image = fakeprovider.FakeGlanceClient() + image.update(data='fake') + else: + image = client.glance.images.create( + name=self.args['image_name'], is_public=False, + disk_format=self.args['disk_format'], + container_format=self.args['container_format']) + image.update(data=open(self.args['filename'], 'rb')) + + return image.id class GetImageTask(Task): def main(self, client): try: - image = client.images.get(**self.args) + image = client.nova.images.get(**self.args) except novaclient.exceptions.NotFound: raise NotFound() # HP returns 404, rackspace can return a 'DELETED' image. @@ -190,7 +208,7 @@ class GetImageTask(Task): class ListExtensionsTask(Task): def main(self, client): try: - resp, body = client.client.get('/extensions') + resp, body = client.nova.client.get('/extensions') return [x['alias'] for x in body['extensions']] except novaclient.exceptions.NotFound: # No extensions present. @@ -199,26 +217,32 @@ class ListExtensionsTask(Task): class ListFlavorsTask(Task): def main(self, client): - flavors = client.flavors.list() + flavors = client.nova.flavors.list() return [dict(id=str(flavor.id), ram=flavor.ram, name=flavor.name) for flavor in flavors] class ListImagesTask(Task): def main(self, client): - images = client.images.list() + images = client.nova.images.list() return [make_image_dict(image) for image in images] class FindImageTask(Task): def main(self, client): - image = client.images.find(**self.args) + image = client.nova.images.find(**self.args) return dict(id=str(image.id)) class DeleteImageTask(Task): def main(self, client): - client.images.delete(**self.args) + client.nova.images.delete(**self.args) + + +class ClientContainer(object): + def __init__(self, nova, glance): + self.nova = nova + self.glance = glance class FindNetworkTask(Task): @@ -264,18 +288,47 @@ class ProviderManager(TaskManager): def _getClient(self): tenant_networks = novaclient.extension.Extension( 'tenant_networks', novaclient.v1_1.contrib.tenant_networks) - args = ['1.1', self.provider.username, self.provider.password, - self.provider.project_id, self.provider.auth_url] - kwargs = {'extensions': [tenant_networks]} + + nova_kwargs = {} + keystone_kwargs = {} + glance_kwargs = {} + + # specific args for client + nova_kwargs['auth_url'] = keystone_kwargs['auth_url'] = \ + self.provider.auth_url + nova_kwargs['extensions'] = [tenant_networks] + + keystone_kwargs['username'] = self.provider.username + keystone_kwargs['password'] = self.provider.password + keystone_kwargs['tenant_name'] = self.provider.project_id + if self.provider.service_type: - kwargs['service_type'] = self.provider.service_type + nova_kwargs['service_type'] = self.provider.service_type + glance_kwargs['service_type'] = 'image' if self.provider.service_name: - kwargs['service_name'] = self.provider.service_name + nova_kwargs['service_name'] = self.provider.service_name if self.provider.region_name: - kwargs['region_name'] = self.provider.region_name + nova_kwargs['region_name'] = keystone_kwargs['region_name'] = \ + self.provider.region_name if self.provider.auth_url == 'fake': return fakeprovider.FAKE_CLIENT - return novaclient.client.Client(*args, **kwargs) + + nova = novaclient.client.Client( + '1.1', self.provider.username, self.provider.password, + self.provider.project_id, **nova_kwargs) + + keystone = ksclient.Client(**keystone_kwargs) + + glance_endpoint = keystone.service_catalog.url_for( + attr='region', + filter_value=keystone_kwargs['region_name'], + service_type='image') + glance_endpoint = glance_endpoint.replace("/v1.0", "") + glance = glanceclient.client.Client( + '1', glance_endpoint, token=keystone.auth_token, + **glance_kwargs) + + return ClientContainer(nova, glance) def _getFlavors(self): flavors = self.listFlavors() @@ -405,6 +458,8 @@ class ProviderManager(TaskManager): return def waitForImage(self, image_id, timeout=3600): + if image_id == 'fake-glance-id': + return True return self._waitForResource('image', image_id, timeout) def createFloatingIP(self, pool=None): @@ -442,6 +497,11 @@ class ProviderManager(TaskManager): def getImage(self, image_id): return self.submitTask(GetImageTask(image=image_id)) + def uploadImage(self, image_name, filename, disk_format, container_format): + return self.submitTask(UploadImageTask( + image_name=image_name, filename='%s.%s' % (filename, disk_format), + disk_format=disk_format, container_format=container_format)) + def listExtensions(self): return self.submitTask(ListExtensionsTask()) @@ -509,7 +569,7 @@ class ProviderManager(TaskManager): self.deleteFloatingIP(ip['id']) if (self.hasExtension('os-keypairs') and - server['key_name'] != self.provider.keypair): + server['key_name'] != self.provider.keypair): for kp in self.listKeypairs(): if kp['name'] == server['key_name']: self.log.debug('Deleting keypair for server %s' % diff --git a/nodepool/tests/fixtures/node_dib.yaml b/nodepool/tests/fixtures/node_dib.yaml new file mode 100644 index 000000000..3443cfcfa --- /dev/null +++ b/nodepool/tests/fixtures/node_dib.yaml @@ -0,0 +1,56 @@ +script-dir: . +elements-dir: . +images-dir: . + +dburi: '{dburi}' + +cron: + check: '*/15 * * * *' + cleanup: '*/1 * * * *' + image-update: '14 2 * * *' + +zmq-publishers: + - tcp://localhost:8881 + +#gearman-servers: +# - host: localhost + +labels: + - name: fake-dib-label + image: fake-dib-image + min-ready: 1 + providers: + - name: fake-dib-provider + +providers: + - name: fake-dib-provider + keypair: 'if-present-use-this-keypair' + username: 'fake' + password: 'fake' + auth-url: 'fake' + project-id: 'fake' + max-servers: 96 + pool: 'fake' + networks: + - net-id: 'some-uuid' + rate: 0.0001 + images: + - name: fake-dib-image + base-image: 'Fake Precise' + min-ram: 8192 + diskimage: fake-dib-image + +targets: + - name: fake-target + jenkins: + url: https://jenkins.example.org/ + user: fake + apikey: fake + +diskimages: + - name: fake-dib-image + elements: + - ubuntu + - vm + release: precise + diff --git a/nodepool/tests/test_nodepool.py b/nodepool/tests/test_nodepool.py index 73f2fd4e2..b459fb5b7 100644 --- a/nodepool/tests/test_nodepool.py +++ b/nodepool/tests/test_nodepool.py @@ -42,8 +42,10 @@ class TestNodepool(tests.DBTestCase): 'Gearman client connect', 'Gearman client poll', 'fake-provider', + 'fake-dib-provider', 'fake-jenkins', 'fake-target', + 'DiskImageBuilder queue', ] while True: @@ -93,6 +95,22 @@ class TestNodepool(tests.DBTestCase): self.assertEqual(len(nodes), 1) pool.stop() + def test_dib_node(self): + """Test that a dib image and node are created""" + configfile = self.setup_config('node_dib.yaml') + pool = nodepool.nodepool.NodePool(configfile, watermark_sleep=1) + pool.start() + time.sleep(3) + self.waitForNodes(pool) + + with pool.getDB().getSession() as session: + nodes = session.getNodes(provider_name='fake-dib-provider', + label_name='fake-dib-label', + target_name='fake-target', + state=nodedb.READY) + self.assertEqual(len(nodes), 1) + pool.stop() + def test_subnodes(self): """Test that an image and node are created""" configfile = self.setup_config('subnodes.yaml') diff --git a/requirements.txt b/requirements.txt index 2a5d3edce..bb295c3a0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,7 +11,10 @@ statsd>=1.0.0,<3.0 apscheduler>=2.1.1,<3.0 sqlalchemy>=0.8.2,<0.9.0 pyzmq>=13.1.0,<14.0.0 +python-glanceclient +python-keystoneclient python-novaclient MySQL-python PrettyTable>=0.6,<0.8 six>=1.4.1 +diskimage-builder