
In order to support putting less things into images via puppet in Infra, we'd like to be able to pre-populate our clouds with keypairs for the infra-root accounts and have nova add those at boot time. Change-Id: I72cfffacdeb56e3e9743e47a107fb309a7517935
1736 lines
70 KiB
Python
1736 lines
70 KiB
Python
#!/usr/bin/env python
|
|
|
|
# Copyright (C) 2011-2014 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
#
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import apscheduler.schedulers.background
|
|
import apscheduler.triggers.cron
|
|
import gear
|
|
import json
|
|
import logging
|
|
import os.path
|
|
import paramiko
|
|
import pprint
|
|
import random
|
|
import threading
|
|
import time
|
|
import zmq
|
|
|
|
import allocation
|
|
import jenkins_manager
|
|
import nodedb
|
|
import exceptions
|
|
import nodeutils as utils
|
|
import provider_manager
|
|
import stats
|
|
import config as nodepool_config
|
|
|
|
import jobs
|
|
import zk
|
|
|
|
MINS = 60
|
|
HOURS = 60 * MINS
|
|
|
|
WATERMARK_SLEEP = 10 # Interval between checking if new servers needed
|
|
IMAGE_TIMEOUT = 6 * HOURS # How long to wait for an image save
|
|
CONNECT_TIMEOUT = 10 * MINS # How long to try to connect after a server
|
|
# is ACTIVE
|
|
NODE_CLEANUP = 8 * HOURS # When to start deleting a node that is not
|
|
# READY or HOLD
|
|
TEST_CLEANUP = 5 * MINS # When to start deleting a node that is in TEST
|
|
IMAGE_CLEANUP = 8 * HOURS # When to start deleting an image that is not
|
|
# READY or is not the current or previous image
|
|
DELETE_DELAY = 1 * MINS # Delay before deleting a node that has completed
|
|
# its job.
|
|
|
|
|
|
class LaunchNodepoolException(Exception):
|
|
statsd_key = 'error.nodepool'
|
|
|
|
|
|
class LaunchStatusException(Exception):
|
|
statsd_key = 'error.status'
|
|
|
|
|
|
class LaunchNetworkException(Exception):
|
|
statsd_key = 'error.network'
|
|
|
|
|
|
class LaunchAuthException(Exception):
|
|
statsd_key = 'error.auth'
|
|
|
|
|
|
class NodeCompleteThread(threading.Thread):
|
|
log = logging.getLogger("nodepool.NodeCompleteThread")
|
|
|
|
def __init__(self, nodepool, nodename, jobname, result, branch):
|
|
threading.Thread.__init__(self, name='NodeCompleteThread for %s' %
|
|
nodename)
|
|
self.nodename = nodename
|
|
self.nodepool = nodepool
|
|
self.jobname = jobname
|
|
self.result = result
|
|
self.branch = branch
|
|
self.statsd = stats.get_client()
|
|
|
|
def run(self):
|
|
try:
|
|
with self.nodepool.getDB().getSession() as session:
|
|
self.handleEvent(session)
|
|
except Exception:
|
|
self.log.exception("Exception handling event for %s:" %
|
|
self.nodename)
|
|
|
|
def handleEvent(self, session):
|
|
node = session.getNodeByNodename(self.nodename)
|
|
if not node:
|
|
self.log.debug("Unable to find node with nodename: %s" %
|
|
self.nodename)
|
|
return
|
|
|
|
if node.state == nodedb.HOLD:
|
|
self.log.info("Node id: %s is complete but in HOLD state" %
|
|
node.id)
|
|
return
|
|
|
|
nodepool_job = session.getJobByName(self.jobname)
|
|
if (nodepool_job and nodepool_job.hold_on_failure and
|
|
self.result != 'SUCCESS'):
|
|
held_nodes = session.getNodes(state=nodedb.HOLD)
|
|
held_nodes = [n for n in held_nodes if self.jobname in n.comment]
|
|
if len(held_nodes) >= nodepool_job.hold_on_failure:
|
|
self.log.info("Node id: %s has failed %s but %s nodes "
|
|
"are already held for that job" % (
|
|
node.id, self.jobname, len(held_nodes)))
|
|
else:
|
|
node.state = nodedb.HOLD
|
|
node.comment = "Automatically held after failing %s" % (
|
|
self.jobname,)
|
|
self.log.info("Node id: %s failed %s, automatically holding" % (
|
|
node.id, self.jobname))
|
|
self.nodepool.updateStats(session, node.provider_name)
|
|
return
|
|
|
|
target = self.nodepool.config.targets[node.target_name]
|
|
if self.jobname == target.jenkins_test_job:
|
|
self.log.debug("Test job for node id: %s complete, result: %s" %
|
|
(node.id, self.result))
|
|
if self.result == 'SUCCESS':
|
|
jenkins = self.nodepool.getJenkinsManager(target)
|
|
old = jenkins.relabelNode(node.nodename, [node.image_name])
|
|
if not old:
|
|
old = '[unlabeled]'
|
|
self.log.info("Relabeled jenkins node id: %s from %s to %s" %
|
|
(node.id, old, node.image_name))
|
|
node.state = nodedb.READY
|
|
self.log.info("Node id: %s is ready" % node.id)
|
|
self.nodepool.updateStats(session, node.provider_name)
|
|
return
|
|
self.log.info("Node id: %s failed acceptance test, deleting" %
|
|
node.id)
|
|
|
|
if self.statsd and self.result == 'SUCCESS':
|
|
start = node.state_time
|
|
dt = int((time.time() - start) * 1000)
|
|
|
|
# nodepool.job.tempest
|
|
key = 'nodepool.job.%s' % self.jobname
|
|
self.statsd.timing(key + '.runtime', dt)
|
|
self.statsd.incr(key + '.builds')
|
|
|
|
# nodepool.job.tempest.master
|
|
key += '.%s' % self.branch
|
|
self.statsd.timing(key + '.runtime', dt)
|
|
self.statsd.incr(key + '.builds')
|
|
|
|
# nodepool.job.tempest.master.devstack-precise
|
|
key += '.%s' % node.label_name
|
|
self.statsd.timing(key + '.runtime', dt)
|
|
self.statsd.incr(key + '.builds')
|
|
|
|
# nodepool.job.tempest.master.devstack-precise.rax-ord
|
|
key += '.%s' % node.provider_name
|
|
self.statsd.timing(key + '.runtime', dt)
|
|
self.statsd.incr(key + '.builds')
|
|
|
|
time.sleep(DELETE_DELAY)
|
|
self.nodepool.deleteNode(node.id)
|
|
|
|
|
|
class NodeUpdateListener(threading.Thread):
|
|
log = logging.getLogger("nodepool.NodeUpdateListener")
|
|
|
|
def __init__(self, nodepool, addr):
|
|
threading.Thread.__init__(self, name='NodeUpdateListener')
|
|
self.nodepool = nodepool
|
|
self.socket = self.nodepool.zmq_context.socket(zmq.SUB)
|
|
self.socket.RCVTIMEO = 1000
|
|
event_filter = b""
|
|
self.socket.setsockopt(zmq.SUBSCRIBE, event_filter)
|
|
self.socket.connect(addr)
|
|
self._stopped = False
|
|
|
|
def run(self):
|
|
while not self._stopped:
|
|
try:
|
|
m = self.socket.recv().decode('utf-8')
|
|
except zmq.error.Again:
|
|
continue
|
|
try:
|
|
topic, data = m.split(None, 1)
|
|
self.handleEvent(topic, data)
|
|
except Exception:
|
|
self.log.exception("Exception handling job:")
|
|
|
|
def stop(self):
|
|
self._stopped = True
|
|
|
|
def handleEvent(self, topic, data):
|
|
self.log.debug("Received: %s %s" % (topic, data))
|
|
args = json.loads(data)
|
|
build = args['build']
|
|
if 'node_name' not in build:
|
|
return
|
|
jobname = args['name']
|
|
nodename = args['build']['node_name']
|
|
if topic == 'onStarted':
|
|
self.handleStartPhase(nodename, jobname)
|
|
elif topic == 'onCompleted':
|
|
pass
|
|
elif topic == 'onFinalized':
|
|
result = args['build'].get('status')
|
|
params = args['build'].get('parameters')
|
|
if params:
|
|
branch = params.get('ZUUL_BRANCH', 'unknown_branch')
|
|
else:
|
|
branch = 'unknown_branch'
|
|
self.handleCompletePhase(nodename, jobname, result, branch)
|
|
else:
|
|
raise Exception("Received job for unhandled phase: %s" %
|
|
topic)
|
|
|
|
def handleStartPhase(self, nodename, jobname):
|
|
with self.nodepool.getDB().getSession() as session:
|
|
node = session.getNodeByNodename(nodename)
|
|
if not node:
|
|
self.log.debug("Unable to find node with nodename: %s" %
|
|
nodename)
|
|
return
|
|
|
|
target = self.nodepool.config.targets[node.target_name]
|
|
if jobname == target.jenkins_test_job:
|
|
self.log.debug("Test job for node id: %s started" % node.id)
|
|
return
|
|
|
|
# Preserve the HOLD state even if a job starts on the node.
|
|
if node.state != nodedb.HOLD:
|
|
self.log.info("Setting node id: %s to USED" % node.id)
|
|
node.state = nodedb.USED
|
|
self.nodepool.updateStats(session, node.provider_name)
|
|
|
|
def handleCompletePhase(self, nodename, jobname, result, branch):
|
|
t = NodeCompleteThread(self.nodepool, nodename, jobname, result,
|
|
branch)
|
|
t.start()
|
|
|
|
|
|
class GearmanClient(gear.Client):
|
|
def __init__(self):
|
|
super(GearmanClient, self).__init__(client_id='nodepool')
|
|
self.__log = logging.getLogger("nodepool.GearmanClient")
|
|
|
|
def getNeededWorkers(self):
|
|
needed_workers = {}
|
|
job_worker_map = {}
|
|
unspecified_jobs = {}
|
|
for connection in self.active_connections:
|
|
try:
|
|
req = gear.StatusAdminRequest()
|
|
connection.sendAdminRequest(req, timeout=300)
|
|
except Exception:
|
|
self.__log.exception("Exception while listing functions")
|
|
self._lostConnection(connection)
|
|
continue
|
|
for line in req.response.split('\n'):
|
|
parts = [x.strip() for x in line.split('\t')]
|
|
# parts[0] - function name
|
|
# parts[1] - total jobs queued (including building)
|
|
# parts[2] - jobs building
|
|
# parts[3] - workers registered
|
|
if not parts or parts[0] == '.':
|
|
continue
|
|
if not parts[0].startswith('build:'):
|
|
continue
|
|
function = parts[0][len('build:'):]
|
|
# total jobs in queue (including building jobs)
|
|
# NOTE(jhesketh): Jobs that are being built are accounted for
|
|
# in the demand algorithm by subtracting the running nodes.
|
|
# If there are foreign (to nodepool) workers accepting jobs
|
|
# the demand will be higher than actually required. However
|
|
# better to have too many than too few and if you have a
|
|
# foreign worker this may be desired.
|
|
try:
|
|
queued = int(parts[1])
|
|
except ValueError as e:
|
|
self.__log.warn(
|
|
'Server returned non-integer value in status. (%s)' %
|
|
str(e))
|
|
queued = 0
|
|
if queued > 0:
|
|
self.__log.debug("Function: %s queued: %s" % (function,
|
|
queued))
|
|
if ':' in function:
|
|
fparts = function.split(':')
|
|
# fparts[0] - function name
|
|
# fparts[1] - target node [type]
|
|
job = fparts[-2]
|
|
worker = fparts[-1]
|
|
workers = job_worker_map.get(job, [])
|
|
workers.append(worker)
|
|
job_worker_map[job] = workers
|
|
if queued > 0:
|
|
needed_workers[worker] = (
|
|
needed_workers.get(worker, 0) + queued)
|
|
elif queued > 0:
|
|
job = function
|
|
unspecified_jobs[job] = (unspecified_jobs.get(job, 0) +
|
|
queued)
|
|
for job, queued in unspecified_jobs.items():
|
|
workers = job_worker_map.get(job)
|
|
if not workers:
|
|
continue
|
|
worker = workers[0]
|
|
needed_workers[worker] = (needed_workers.get(worker, 0) +
|
|
queued)
|
|
return needed_workers
|
|
|
|
def handleWorkComplete(self, packet):
|
|
job = super(GearmanClient, self).handleWorkComplete(packet)
|
|
job.onCompleted()
|
|
|
|
def handleWorkFail(self, packet):
|
|
job = super(GearmanClient, self).handleWorkFail(packet)
|
|
job.onFailed()
|
|
|
|
def handleWorkException(self, packet):
|
|
job = super(GearmanClient, self).handleWorkException(packet)
|
|
job.onFailed()
|
|
|
|
def handleDisconnect(self, job):
|
|
super(GearmanClient, self).handleDisconnect(job)
|
|
job.onDisconnect()
|
|
|
|
def handleWorkStatus(self, packet):
|
|
job = super(GearmanClient, self).handleWorkStatus(packet)
|
|
job.onWorkStatus()
|
|
|
|
|
|
class InstanceDeleter(threading.Thread):
|
|
log = logging.getLogger("nodepool.InstanceDeleter")
|
|
|
|
def __init__(self, nodepool, provider_name, external_id):
|
|
threading.Thread.__init__(self, name='InstanceDeleter for %s %s' %
|
|
(provider_name, external_id))
|
|
self.nodepool = nodepool
|
|
self.provider_name = provider_name
|
|
self.external_id = external_id
|
|
|
|
def run(self):
|
|
try:
|
|
self.nodepool._deleteInstance(self.provider_name,
|
|
self.external_id)
|
|
except Exception:
|
|
self.log.exception("Exception deleting instance %s from %s:" %
|
|
(self.external_id, self.provider_name))
|
|
|
|
|
|
class NodeDeleter(threading.Thread):
|
|
log = logging.getLogger("nodepool.NodeDeleter")
|
|
|
|
def __init__(self, nodepool, node_id):
|
|
threading.Thread.__init__(self, name='NodeDeleter for %s' % node_id)
|
|
self.node_id = node_id
|
|
self.nodepool = nodepool
|
|
|
|
def run(self):
|
|
try:
|
|
with self.nodepool.getDB().getSession() as session:
|
|
node = session.getNode(self.node_id)
|
|
self.nodepool._deleteNode(session, node)
|
|
except Exception:
|
|
self.log.exception("Exception deleting node %s:" %
|
|
self.node_id)
|
|
|
|
|
|
class NodeLauncher(threading.Thread):
|
|
log = logging.getLogger("nodepool.NodeLauncher")
|
|
|
|
def __init__(self, nodepool, provider, label, target, node_id, timeout,
|
|
launch_timeout):
|
|
threading.Thread.__init__(self, name='NodeLauncher for %s' % node_id)
|
|
self.provider = provider
|
|
self.label = label
|
|
self.image = provider.images[label.image]
|
|
self.target = target
|
|
self.node_id = node_id
|
|
self.timeout = timeout
|
|
self.nodepool = nodepool
|
|
self.launch_timeout = launch_timeout
|
|
|
|
def run(self):
|
|
try:
|
|
self._run()
|
|
except Exception:
|
|
self.log.exception("Exception in run method:")
|
|
|
|
def _run(self):
|
|
with self.nodepool.getDB().getSession() as session:
|
|
self.log.debug("Launching node id: %s" % self.node_id)
|
|
try:
|
|
self.node = session.getNode(self.node_id)
|
|
self.manager = self.nodepool.getProviderManager(self.provider)
|
|
except Exception:
|
|
self.log.exception("Exception preparing to launch node id: %s:"
|
|
% self.node_id)
|
|
return
|
|
|
|
try:
|
|
start_time = time.time()
|
|
dt = self.launchNode(session)
|
|
failed = False
|
|
statsd_key = 'ready'
|
|
self.log.debug('Node %s ready in provider: %s' %
|
|
(self.node_id, self.provider.name))
|
|
except exceptions.TimeoutException as e:
|
|
# Don't log exception for timeouts. Each one has
|
|
# a specific Exception, and we know it's a timeout, so
|
|
# the traceback in the log is just noise
|
|
self.log.error("Timeout launching node id: %s "
|
|
"in provider: %s error: %s" %
|
|
(self.node_id, self.provider.name,
|
|
str(e)))
|
|
dt = int((time.time() - start_time) * 1000)
|
|
failed = True
|
|
statsd_key = e.statsd_key
|
|
except Exception as e:
|
|
self.log.exception("%s launching node id: %s "
|
|
"in provider: %s error:" %
|
|
(e.__class__.__name__,
|
|
self.node_id, self.provider.name))
|
|
dt = int((time.time() - start_time) * 1000)
|
|
failed = True
|
|
if hasattr(e, 'statsd_key'):
|
|
statsd_key = e.statsd_key
|
|
else:
|
|
statsd_key = 'error.unknown'
|
|
|
|
try:
|
|
|
|
self.nodepool.launchStats(statsd_key, dt, self.image.name,
|
|
self.provider.name,
|
|
self.target.name,
|
|
self.node.az,
|
|
self.node.manager_name)
|
|
except Exception:
|
|
self.log.exception("Exception reporting launch stats:")
|
|
|
|
if failed:
|
|
try:
|
|
self.nodepool.deleteNode(self.node_id)
|
|
except Exception:
|
|
self.log.exception("Exception deleting node id: %s:" %
|
|
self.node_id)
|
|
|
|
def launchNode(self, session):
|
|
start_time = time.time()
|
|
timestamp = int(start_time)
|
|
|
|
hostname = self.target.hostname.format(
|
|
label=self.label, provider=self.provider, node_id=self.node.id,
|
|
timestamp=str(timestamp))
|
|
self.node.hostname = hostname
|
|
self.node.nodename = hostname.split('.')[0]
|
|
self.node.target_name = self.target.name
|
|
|
|
cloud_image = self.nodepool.zk.getMostRecentImageUpload(
|
|
self.image.name, self.provider.name)
|
|
if not cloud_image:
|
|
raise LaunchNodepoolException("Unable to find current cloud"
|
|
"image %s in %s" %
|
|
(self.image.name,
|
|
self.provider.name))
|
|
|
|
self.log.info("Creating server with hostname %s in %s from image %s "
|
|
"for node id: %s" % (hostname, self.provider.name,
|
|
self.image.name, self.node_id))
|
|
server = self.manager.createServer(
|
|
hostname, self.image.min_ram, cloud_image.external_id,
|
|
name_filter=self.image.name_filter, az=self.node.az,
|
|
key_name=self.image.key_name,
|
|
config_drive=self.image.config_drive,
|
|
nodepool_node_id=self.node_id,
|
|
nodepool_image_name=self.image.name)
|
|
server_id = server['id']
|
|
self.node.external_id = server_id
|
|
session.commit()
|
|
|
|
self.log.debug("Waiting for server %s for node id: %s" %
|
|
(server_id, self.node.id))
|
|
server = self.manager.waitForServer(server, self.launch_timeout)
|
|
if server['status'] != 'ACTIVE':
|
|
raise LaunchStatusException("Server %s for node id: %s "
|
|
"status: %s" %
|
|
(server_id, self.node.id,
|
|
server['status']))
|
|
|
|
ip = server.get('public_v4')
|
|
ip_v6 = server.get('public_v6')
|
|
if self.provider.ipv6_preferred:
|
|
if ip_v6:
|
|
ip = ip_v6
|
|
else:
|
|
self.log.warning('Preferred ipv6 not available, '
|
|
'falling back to ipv4.')
|
|
if not ip:
|
|
self.log.debug(
|
|
"Server data for failed IP: %s" % pprint.pformat(
|
|
server))
|
|
raise LaunchNetworkException("Unable to find public IP of server")
|
|
|
|
self.node.ip_private = server.get('private_v4')
|
|
# devstack-gate multi-node depends on private_v4 being populated
|
|
# with something. On clouds that don't have a private address, use
|
|
# the public.
|
|
if not self.node.ip_private:
|
|
self.node.ip_private = server.get('public_v4')
|
|
self.node.ip = ip
|
|
self.log.debug("Node id: %s is running, ipv4: %s, ipv6: %s" %
|
|
(self.node.id, server.get('public_v4'),
|
|
server.get('public_v6')))
|
|
|
|
self.log.debug("Node id: %s testing ssh at ip: %s" %
|
|
(self.node.id, ip))
|
|
connect_kwargs = dict(key_filename=self.image.private_key)
|
|
if not utils.ssh_connect(ip, self.image.username,
|
|
connect_kwargs=connect_kwargs,
|
|
timeout=self.timeout):
|
|
raise LaunchAuthException("Unable to connect via ssh")
|
|
|
|
# Save the elapsed time for statsd
|
|
dt = int((time.time() - start_time) * 1000)
|
|
|
|
if self.label.subnodes:
|
|
self.log.info("Node id: %s is waiting on subnodes" % self.node.id)
|
|
|
|
while ((time.time() - start_time) < (NODE_CLEANUP - 60)):
|
|
session.commit()
|
|
ready_subnodes = [n for n in self.node.subnodes
|
|
if n.state == nodedb.READY]
|
|
if len(ready_subnodes) == self.label.subnodes:
|
|
break
|
|
time.sleep(5)
|
|
|
|
nodelist = []
|
|
for subnode in self.node.subnodes:
|
|
nodelist.append(('sub', subnode))
|
|
nodelist.append(('primary', self.node))
|
|
|
|
self.writeNodepoolInfo(nodelist)
|
|
if self.label.ready_script:
|
|
self.runReadyScript(nodelist)
|
|
|
|
# Do this before adding to jenkins to avoid a race where
|
|
# Jenkins might immediately use the node before we've updated
|
|
# the state:
|
|
if self.target.jenkins_test_job:
|
|
self.node.state = nodedb.TEST
|
|
self.log.info("Node id: %s is in testing" % self.node.id)
|
|
else:
|
|
self.node.state = nodedb.READY
|
|
self.log.info("Node id: %s is ready" % self.node.id)
|
|
self.nodepool.updateStats(session, self.provider.name)
|
|
|
|
if self.target.jenkins_url:
|
|
self.log.debug("Adding node id: %s to jenkins" % self.node.id)
|
|
self.createJenkinsNode()
|
|
self.log.info("Node id: %s added to jenkins" % self.node.id)
|
|
|
|
if self.target.assign_via_gearman:
|
|
self.log.info("Node id: %s assigning via gearman" % self.node.id)
|
|
self.assignViaGearman()
|
|
|
|
return dt
|
|
|
|
def createJenkinsNode(self):
|
|
jenkins = self.nodepool.getJenkinsManager(self.target)
|
|
|
|
args = dict(name=self.node.nodename,
|
|
host=self.node.ip,
|
|
description='Dynamic single use %s node' % self.label.name,
|
|
executors=1,
|
|
root=self.image.user_home)
|
|
if not self.target.jenkins_test_job:
|
|
args['labels'] = self.label.name
|
|
if self.target.jenkins_credentials_id:
|
|
args['credentials_id'] = self.target.jenkins_credentials_id
|
|
else:
|
|
args['username'] = self.image.username
|
|
args['private_key'] = self.image.private_key
|
|
|
|
jenkins.createNode(**args)
|
|
|
|
if self.target.jenkins_test_job:
|
|
params = dict(NODE=self.node.nodename)
|
|
jenkins.startBuild(self.target.jenkins_test_job, params)
|
|
|
|
def assignViaGearman(self):
|
|
args = dict(name=self.node.nodename,
|
|
host=self.node.ip,
|
|
description='Dynamic single use %s node' % self.label.name,
|
|
labels=self.label.name,
|
|
root=self.image.user_home)
|
|
job = jobs.NodeAssignmentJob(self.node.id, self.node.target_name,
|
|
args, self.nodepool)
|
|
self.nodepool.gearman_client.submitJob(job, timeout=300)
|
|
job.waitForCompletion()
|
|
self.log.info("Node id: %s received %s from assignment" % (
|
|
self.node.id, job.data))
|
|
if job.failure:
|
|
raise Exception("Node id: %s received job failure on assignment" %
|
|
self.node.id)
|
|
data = json.loads(job.data[-1])
|
|
self.node.manager_name = data['manager']
|
|
|
|
def writeNodepoolInfo(self, nodelist):
|
|
key = paramiko.RSAKey.generate(2048)
|
|
public_key = key.get_name() + ' ' + key.get_base64()
|
|
|
|
for role, n in nodelist:
|
|
connect_kwargs = dict(key_filename=self.image.private_key)
|
|
host = utils.ssh_connect(n.ip, self.image.username,
|
|
connect_kwargs=connect_kwargs,
|
|
timeout=self.timeout)
|
|
if not host:
|
|
raise Exception("Unable to log in via SSH")
|
|
|
|
# This is a test for the existence and not a creation on purpose.
|
|
# Current requirements for nodepool nodes are that nodepool can log
|
|
# in and that it can write to /etc/nodepool.
|
|
host.ssh("test for config dir", "ls /etc/nodepool")
|
|
|
|
ftp = host.client.open_sftp()
|
|
|
|
# The Role of this node
|
|
f = ftp.open('/etc/nodepool/role', 'w')
|
|
f.write(role + '\n')
|
|
f.close()
|
|
# The IP of this node
|
|
f = ftp.open('/etc/nodepool/node', 'w')
|
|
f.write(n.ip + '\n')
|
|
f.close()
|
|
# The private IP of this node
|
|
f = ftp.open('/etc/nodepool/node_private', 'w')
|
|
f.write(n.ip_private + '\n')
|
|
f.close()
|
|
# The IP of the primary node of this node set
|
|
f = ftp.open('/etc/nodepool/primary_node', 'w')
|
|
f.write(self.node.ip + '\n')
|
|
f.close()
|
|
# The private IP of the primary node of this node set
|
|
f = ftp.open('/etc/nodepool/primary_node_private', 'w')
|
|
f.write(self.node.ip_private + '\n')
|
|
f.close()
|
|
# The IPs of all sub nodes in this node set
|
|
f = ftp.open('/etc/nodepool/sub_nodes', 'w')
|
|
for subnode in self.node.subnodes:
|
|
f.write(subnode.ip + '\n')
|
|
f.close()
|
|
# The private IPs of all sub nodes in this node set
|
|
f = ftp.open('/etc/nodepool/sub_nodes_private', 'w')
|
|
for subnode in self.node.subnodes:
|
|
f.write(subnode.ip_private + '\n')
|
|
f.close()
|
|
# The SSH key for this node set
|
|
f = ftp.open('/etc/nodepool/id_rsa', 'w')
|
|
key.write_private_key(f)
|
|
f.close()
|
|
f = ftp.open('/etc/nodepool/id_rsa.pub', 'w')
|
|
f.write(public_key + '\n')
|
|
f.close()
|
|
# Provider information for this node set
|
|
f = ftp.open('/etc/nodepool/provider', 'w')
|
|
f.write('NODEPOOL_PROVIDER=%s\n' % self.provider.name)
|
|
f.write('NODEPOOL_CLOUD=%s\n' % self.provider.cloud_config.name)
|
|
f.write('NODEPOOL_REGION=%s\n' % (
|
|
self.provider.region_name or '',))
|
|
f.write('NODEPOOL_AZ=%s\n' % (self.node.az or '',))
|
|
f.close()
|
|
# The instance UUID for this node
|
|
f = ftp.open('/etc/nodepool/uuid', 'w')
|
|
f.write(n.external_id + '\n')
|
|
f.close()
|
|
|
|
ftp.close()
|
|
|
|
def runReadyScript(self, nodelist):
|
|
for role, n in nodelist:
|
|
connect_kwargs = dict(key_filename=self.image.private_key)
|
|
host = utils.ssh_connect(n.ip, self.image.username,
|
|
connect_kwargs=connect_kwargs,
|
|
timeout=self.timeout)
|
|
if not host:
|
|
raise Exception("Unable to log in via SSH")
|
|
|
|
env_vars = ''
|
|
for k, v in os.environ.items():
|
|
if k.startswith('NODEPOOL_'):
|
|
env_vars += ' %s="%s"' % (k, v)
|
|
host.ssh("run ready script",
|
|
"cd /opt/nodepool-scripts && %s ./%s %s" %
|
|
(env_vars, self.label.ready_script, n.hostname),
|
|
output=True)
|
|
|
|
|
|
class SubNodeLauncher(threading.Thread):
|
|
log = logging.getLogger("nodepool.SubNodeLauncher")
|
|
|
|
def __init__(self, nodepool, provider, label, subnode_id,
|
|
node_id, node_target_name, timeout, launch_timeout, node_az,
|
|
manager_name):
|
|
threading.Thread.__init__(self, name='SubNodeLauncher for %s'
|
|
% subnode_id)
|
|
self.provider = provider
|
|
self.label = label
|
|
self.image = provider.images[label.image]
|
|
self.node_target_name = node_target_name
|
|
self.subnode_id = subnode_id
|
|
self.node_id = node_id
|
|
self.timeout = timeout
|
|
self.nodepool = nodepool
|
|
self.launch_timeout = launch_timeout
|
|
self.node_az = node_az
|
|
self.manager_name = manager_name
|
|
|
|
def run(self):
|
|
try:
|
|
self._run()
|
|
except Exception:
|
|
self.log.exception("Exception in run method:")
|
|
|
|
def _run(self):
|
|
with self.nodepool.getDB().getSession() as session:
|
|
self.log.debug("Launching subnode id: %s for node id: %s" %
|
|
(self.subnode_id, self.node_id))
|
|
try:
|
|
self.subnode = session.getSubNode(self.subnode_id)
|
|
self.manager = self.nodepool.getProviderManager(self.provider)
|
|
except Exception:
|
|
self.log.exception("Exception preparing to launch subnode "
|
|
"id: %s for node id: %s:"
|
|
% (self.subnode_id, self.node_id))
|
|
return
|
|
|
|
try:
|
|
start_time = time.time()
|
|
dt = self.launchSubNode(session)
|
|
failed = False
|
|
statsd_key = 'ready'
|
|
except Exception as e:
|
|
self.log.exception("%s launching subnode id: %s "
|
|
"for node id: %s in provider: %s error:" %
|
|
(e.__class__.__name__, self.subnode_id,
|
|
self.node_id, self.provider.name))
|
|
dt = int((time.time() - start_time) * 1000)
|
|
failed = True
|
|
if hasattr(e, 'statsd_key'):
|
|
statsd_key = e.statsd_key
|
|
else:
|
|
statsd_key = 'error.unknown'
|
|
|
|
try:
|
|
self.nodepool.launchStats(statsd_key, dt, self.image.name,
|
|
self.provider.name,
|
|
self.node_target_name,
|
|
self.node_az,
|
|
self.manager_name)
|
|
except Exception:
|
|
self.log.exception("Exception reporting launch stats:")
|
|
|
|
if failed:
|
|
try:
|
|
self.nodepool.deleteSubNode(self.subnode, self.manager)
|
|
except Exception:
|
|
self.log.exception("Exception deleting subnode id: %s: "
|
|
"for node id: %s:" %
|
|
(self.subnode_id, self.node_id))
|
|
return
|
|
|
|
def launchSubNode(self, session):
|
|
start_time = time.time()
|
|
timestamp = int(start_time)
|
|
|
|
target = self.nodepool.config.targets[self.node_target_name]
|
|
hostname = target.subnode_hostname.format(
|
|
label=self.label, provider=self.provider, node_id=self.node_id,
|
|
subnode_id=self.subnode_id, timestamp=str(timestamp))
|
|
self.subnode.hostname = hostname
|
|
self.subnode.nodename = hostname.split('.')[0]
|
|
|
|
cloud_image = self.nodepool.zk.getMostRecentImageUpload(
|
|
self.image.name, self.provider.name)
|
|
if not cloud_image:
|
|
raise LaunchNodepoolException("Unable to find current cloud "
|
|
"image %s in %s" %
|
|
(self.image.name,
|
|
self.provider.name))
|
|
|
|
self.log.info("Creating server with hostname %s in %s from image %s "
|
|
"for subnode id: %s for node id: %s"
|
|
% (hostname, self.provider.name,
|
|
self.image.name, self.subnode_id, self.node_id))
|
|
server = self.manager.createServer(
|
|
hostname, self.image.min_ram, cloud_image.external_id,
|
|
name_filter=self.image.name_filter, az=self.node_az,
|
|
key_name=self.image.key_name,
|
|
config_drive=self.image.config_drive,
|
|
nodepool_node_id=self.node_id,
|
|
nodepool_image_name=self.image.name)
|
|
server_id = server['id']
|
|
self.subnode.external_id = server_id
|
|
session.commit()
|
|
|
|
self.log.debug("Waiting for server %s for subnode id: %s for "
|
|
"node id: %s" %
|
|
(server_id, self.subnode_id, self.node_id))
|
|
server = self.manager.waitForServer(server, self.launch_timeout)
|
|
if server['status'] != 'ACTIVE':
|
|
raise LaunchStatusException("Server %s for subnode id: "
|
|
"%s for node id: %s "
|
|
"status: %s" %
|
|
(server_id, self.subnode_id,
|
|
self.node_id, server['status']))
|
|
|
|
ip = server.get('public_v4')
|
|
ip_v6 = server.get('public_v6')
|
|
if self.provider.ipv6_preferred:
|
|
if ip_v6:
|
|
ip = ip_v6
|
|
else:
|
|
self.log.warning('Preferred ipv6 not available, '
|
|
'falling back to ipv4.')
|
|
if not ip:
|
|
raise LaunchNetworkException("Unable to find public IP of server")
|
|
|
|
self.subnode.ip_private = server.get('private_v4')
|
|
# devstack-gate multi-node depends on private_v4 being populated
|
|
# with something. On clouds that don't have a private address, use
|
|
# the public.
|
|
if not self.subnode.ip_private:
|
|
self.subnode.ip_private = server.get('public_v4')
|
|
self.subnode.ip = ip
|
|
self.log.debug("Subnode id: %s for node id: %s is running, "
|
|
"ipv4: %s, ipv6: %s" %
|
|
(self.subnode_id, self.node_id, server.get('public_v4'),
|
|
server.get('public_v6')))
|
|
|
|
self.log.debug("Subnode id: %s for node id: %s testing ssh at ip: %s" %
|
|
(self.subnode_id, self.node_id, ip))
|
|
connect_kwargs = dict(key_filename=self.image.private_key)
|
|
if not utils.ssh_connect(ip, self.image.username,
|
|
connect_kwargs=connect_kwargs,
|
|
timeout=self.timeout):
|
|
raise LaunchAuthException("Unable to connect via ssh")
|
|
|
|
# Save the elapsed time for statsd
|
|
dt = int((time.time() - start_time) * 1000)
|
|
|
|
self.subnode.state = nodedb.READY
|
|
self.log.info("Subnode id: %s for node id: %s is ready"
|
|
% (self.subnode_id, self.node_id))
|
|
self.nodepool.updateStats(session, self.provider.name)
|
|
|
|
return dt
|
|
|
|
|
|
class NodePool(threading.Thread):
|
|
log = logging.getLogger("nodepool.NodePool")
|
|
|
|
def __init__(self, securefile, configfile, no_deletes=False,
|
|
no_launches=False, watermark_sleep=WATERMARK_SLEEP):
|
|
threading.Thread.__init__(self, name='NodePool')
|
|
self.securefile = securefile
|
|
self.configfile = configfile
|
|
self.no_deletes = no_deletes
|
|
self.no_launches = no_launches
|
|
self.watermark_sleep = watermark_sleep
|
|
self._stopped = False
|
|
self.config = None
|
|
self.zmq_context = None
|
|
self.gearman_client = None
|
|
self.apsched = None
|
|
self.zk = None
|
|
self.statsd = stats.get_client()
|
|
self._delete_threads = {}
|
|
self._delete_threads_lock = threading.Lock()
|
|
self._instance_delete_threads = {}
|
|
self._instance_delete_threads_lock = threading.Lock()
|
|
self._wake_condition = threading.Condition()
|
|
|
|
def stop(self):
|
|
self._stopped = True
|
|
self._wake_condition.acquire()
|
|
self._wake_condition.notify()
|
|
self._wake_condition.release()
|
|
if self.config:
|
|
for z in self.config.zmq_publishers.values():
|
|
z.listener.stop()
|
|
z.listener.join()
|
|
provider_manager.ProviderManager.stopProviders(self.config)
|
|
if self.zmq_context:
|
|
self.zmq_context.destroy()
|
|
if self.apsched and self.apsched.running:
|
|
self.apsched.shutdown()
|
|
if self.gearman_client:
|
|
self.gearman_client.shutdown()
|
|
self.log.debug("finished stopping")
|
|
|
|
def loadConfig(self):
|
|
self.log.debug("Loading configuration")
|
|
config = nodepool_config.loadConfig(self.configfile)
|
|
nodepool_config.loadSecureConfig(config, self.securefile)
|
|
return config
|
|
|
|
def reconfigureDatabase(self, config):
|
|
if (not self.config) or config.dburi != self.config.dburi:
|
|
config.db = nodedb.NodeDatabase(config.dburi)
|
|
else:
|
|
config.db = self.config.db
|
|
|
|
def reconfigureManagers(self, config, check_targets=True):
|
|
provider_manager.ProviderManager.reconfigure(self.config, config)
|
|
|
|
stop_managers = []
|
|
for t in config.targets.values():
|
|
oldmanager = None
|
|
if self.config:
|
|
oldmanager = self.config.jenkins_managers.get(t.name)
|
|
if oldmanager:
|
|
if (t.jenkins_url != oldmanager.target.jenkins_url or
|
|
t.jenkins_user != oldmanager.target.jenkins_user or
|
|
t.jenkins_apikey != oldmanager.target.jenkins_apikey):
|
|
stop_managers.append(oldmanager)
|
|
oldmanager = None
|
|
if oldmanager:
|
|
config.jenkins_managers[t.name] = oldmanager
|
|
elif t.jenkins_url:
|
|
self.log.debug("Creating new JenkinsManager object "
|
|
"for %s" % t.name)
|
|
config.jenkins_managers[t.name] = \
|
|
jenkins_manager.JenkinsManager(t)
|
|
config.jenkins_managers[t.name].start()
|
|
for oldmanager in stop_managers:
|
|
oldmanager.stop()
|
|
|
|
# only do it if we need to check for targets
|
|
if check_targets:
|
|
for t in config.targets.values():
|
|
if t.jenkins_url:
|
|
try:
|
|
info = config.jenkins_managers[t.name].getInfo()
|
|
if info['quietingDown']:
|
|
self.log.info("Target %s is offline" % t.name)
|
|
t.online = False
|
|
else:
|
|
t.online = True
|
|
except Exception:
|
|
self.log.exception("Unable to check status of %s" %
|
|
t.name)
|
|
t.online = False
|
|
|
|
def reconfigureCrons(self, config):
|
|
cron_map = {
|
|
'cleanup': self._doPeriodicCleanup,
|
|
'check': self._doPeriodicCheck,
|
|
}
|
|
|
|
if not self.apsched:
|
|
self.apsched = apscheduler.schedulers.background.BackgroundScheduler()
|
|
self.apsched.start()
|
|
|
|
for c in config.crons.values():
|
|
if ((not self.config) or
|
|
c.timespec != self.config.crons[c.name].timespec):
|
|
if self.config and self.config.crons[c.name].job:
|
|
self.config.crons[c.name].job.remove()
|
|
parts = c.timespec.split()
|
|
if len(parts) > 5:
|
|
second = parts[5]
|
|
else:
|
|
second = None
|
|
minute, hour, dom, month, dow = parts[:5]
|
|
trigger = apscheduler.triggers.cron.CronTrigger(
|
|
month=month, day=dom, day_of_week=dow,
|
|
hour=hour, minute=minute, second=second)
|
|
c.job = self.apsched.add_job(
|
|
cron_map[c.name], trigger=trigger)
|
|
else:
|
|
c.job = self.config.crons[c.name].job
|
|
|
|
def reconfigureUpdateListeners(self, config):
|
|
if self.no_deletes:
|
|
return
|
|
if self.config:
|
|
running = set(self.config.zmq_publishers.keys())
|
|
else:
|
|
running = set()
|
|
|
|
configured = set(config.zmq_publishers.keys())
|
|
if running == configured:
|
|
self.log.debug("ZMQ Listeners do not need to be updated")
|
|
if self.config:
|
|
config.zmq_publishers = self.config.zmq_publishers
|
|
return
|
|
|
|
if self.zmq_context:
|
|
self.log.debug("Stopping listeners")
|
|
self.zmq_context.destroy()
|
|
self.zmq_context = zmq.Context()
|
|
for z in config.zmq_publishers.values():
|
|
self.log.debug("Starting listener for %s" % z.name)
|
|
z.listener = NodeUpdateListener(self, z.name)
|
|
z.listener.start()
|
|
|
|
def reconfigureGearmanClient(self, config):
|
|
if self.config:
|
|
running = set(self.config.gearman_servers.keys())
|
|
else:
|
|
running = set()
|
|
|
|
configured = set(config.gearman_servers.keys())
|
|
if running == configured:
|
|
self.log.debug("Gearman client does not need to be updated")
|
|
if self.config:
|
|
config.gearman_servers = self.config.gearman_servers
|
|
return
|
|
|
|
if self.gearman_client:
|
|
self.log.debug("Stopping gearman client")
|
|
self.gearman_client.shutdown()
|
|
self.gearman_client = None
|
|
if configured:
|
|
self.gearman_client = GearmanClient()
|
|
for g in config.gearman_servers.values():
|
|
self.log.debug("Adding gearman server %s" % g.name)
|
|
self.gearman_client.addServer(g.host, g.port)
|
|
self.gearman_client.waitForServer()
|
|
|
|
def reconfigureZooKeeper(self, config):
|
|
if self.config:
|
|
running = self.config.zookeeper_servers.values()
|
|
else:
|
|
running = None
|
|
|
|
configured = config.zookeeper_servers.values()
|
|
if running == configured:
|
|
self.log.debug("Zookeeper client does not need to be updated")
|
|
return
|
|
|
|
if not self.zk and configured:
|
|
self.log.debug("Connecting to ZooKeeper servers")
|
|
self.zk = zk.ZooKeeper()
|
|
self.zk.connect(configured)
|
|
else:
|
|
self.log.debug("Detected ZooKeeper server changes")
|
|
self.zk.resetHosts(configured)
|
|
|
|
def setConfig(self, config):
|
|
self.config = config
|
|
|
|
def getDB(self):
|
|
return self.config.db
|
|
|
|
def getZK(self):
|
|
return self.zk
|
|
|
|
def getProviderManager(self, provider):
|
|
return self.config.provider_managers[provider.name]
|
|
|
|
def getJenkinsManager(self, target):
|
|
if target.name in self.config.jenkins_managers:
|
|
return self.config.jenkins_managers[target.name]
|
|
else:
|
|
raise KeyError("{0} not in {1}".format(target.name,
|
|
self.config.jenkins_managers.keys()))
|
|
|
|
def getNeededNodes(self, session, allocation_history):
|
|
self.log.debug("Beginning node launch calculation")
|
|
# Get the current demand for nodes.
|
|
if self.gearman_client:
|
|
label_demand = self.gearman_client.getNeededWorkers()
|
|
else:
|
|
label_demand = {}
|
|
|
|
for name, demand in label_demand.items():
|
|
self.log.debug(" Demand from gearman: %s: %s" % (name, demand))
|
|
|
|
online_targets = set()
|
|
for target in self.config.targets.values():
|
|
if not target.online:
|
|
continue
|
|
online_targets.add(target.name)
|
|
|
|
nodes = session.getNodes()
|
|
|
|
def count_nodes(label_name, state):
|
|
return len([n for n in nodes
|
|
if (n.target_name in online_targets and
|
|
n.label_name == label_name and
|
|
n.state == state)])
|
|
|
|
def count_nodes_and_subnodes(provider_name):
|
|
count = 0
|
|
for n in nodes:
|
|
if n.provider_name != provider_name:
|
|
continue
|
|
count += 1 + len(n.subnodes)
|
|
return count
|
|
|
|
# Add a provider for each node provider, along with current
|
|
# capacity
|
|
allocation_providers = {}
|
|
for provider in self.config.providers.values():
|
|
provider_max = provider.max_servers
|
|
n_provider = count_nodes_and_subnodes(provider.name)
|
|
available = provider_max - n_provider
|
|
if available < 0:
|
|
self.log.warning("Provider %s over-allocated: "
|
|
"max-servers %d but counted %d " %
|
|
(provider.name, provider_max, n_provider))
|
|
available = 0
|
|
ap = allocation.AllocationProvider(provider.name, available)
|
|
allocation_providers[provider.name] = ap
|
|
|
|
# calculate demand for labels
|
|
# Actual need is: demand - (ready + building + used)
|
|
# NOTE(jhesketh): This assumes that the nodes in use are actually being
|
|
# used for a job in demand.
|
|
for label in self.config.labels.values():
|
|
start_demand = label_demand.get(label.name, 0)
|
|
n_ready = count_nodes(label.name, nodedb.READY)
|
|
n_building = count_nodes(label.name, nodedb.BUILDING)
|
|
n_used = count_nodes(label.name, nodedb.USED)
|
|
n_test = count_nodes(label.name, nodedb.TEST)
|
|
ready = n_ready + n_building + n_used + n_test
|
|
|
|
capacity = 0
|
|
for provider in label.providers.values():
|
|
capacity += allocation_providers[provider.name].available
|
|
|
|
# Note actual_demand and extra_demand are written this way
|
|
# because max(0, x - y + z) != max(0, x - y) + z.
|
|
# The requested number of nodes minus those already available
|
|
actual_demand = max(0, start_demand - ready)
|
|
# Demand that accomodates extra demand from min-ready value
|
|
extra_demand = max(0, start_demand + label.min_ready - ready)
|
|
# We only request extras for the min ready value if there is
|
|
# clearly capacity for them. This is to avoid the allocator
|
|
# making bad choices spinning up nodes to satisfy min-ready when
|
|
# there is "real" work to do with other nodes.
|
|
if extra_demand <= capacity:
|
|
demand = extra_demand
|
|
else:
|
|
demand = actual_demand
|
|
|
|
label_demand[label.name] = demand
|
|
self.log.debug(" Deficit: %s: %s "
|
|
"(start: %s min-ready: %s ready: %s capacity: %s)" %
|
|
(label.name, demand,
|
|
start_demand, label.min_ready, ready, capacity))
|
|
|
|
# "Target-Label-Provider" -- the triplet of info that identifies
|
|
# the source and location of each node. The mapping is
|
|
# AllocationGrantTarget -> TargetLabelProvider, because
|
|
# the allocation system produces AGTs as the final product.
|
|
tlps = {}
|
|
# label_name -> AllocationRequest
|
|
allocation_requests = {}
|
|
# Set up the request values in the allocation system
|
|
for target in self.config.targets.values():
|
|
if not target.online:
|
|
continue
|
|
at = allocation.AllocationTarget(target.name)
|
|
for label in self.config.labels.values():
|
|
ar = allocation_requests.get(label.name)
|
|
if not ar:
|
|
# A request for a certain number of nodes of this
|
|
# label type. We may have already started a
|
|
# request from a previous target-label in this
|
|
# loop.
|
|
ar = allocation.AllocationRequest(label.name,
|
|
label_demand[label.name],
|
|
allocation_history)
|
|
|
|
nodes = session.getNodes(label_name=label.name,
|
|
target_name=target.name)
|
|
allocation_requests[label.name] = ar
|
|
ar.addTarget(at, len(nodes))
|
|
for provider in label.providers.values():
|
|
image = self.zk.getMostRecentImageUpload(
|
|
label.image, provider.name)
|
|
if image:
|
|
# This request may be supplied by this provider
|
|
# (and nodes from this provider supplying this
|
|
# request should be distributed to this target).
|
|
sr, agt = ar.addProvider(
|
|
allocation_providers[provider.name],
|
|
at, label.subnodes)
|
|
tlps[agt] = (target, label,
|
|
self.config.providers[provider.name])
|
|
else:
|
|
self.log.debug(" %s does not have image %s "
|
|
"for label %s." % (provider.name,
|
|
label.image,
|
|
label.name))
|
|
|
|
self.log.debug(" Allocation requests:")
|
|
for ar in allocation_requests.values():
|
|
self.log.debug(' %s' % ar)
|
|
for sr in ar.sub_requests.values():
|
|
self.log.debug(' %s' % sr)
|
|
|
|
nodes_to_launch = []
|
|
|
|
# Let the allocation system do it's thing, and then examine
|
|
# the AGT objects that it produces.
|
|
self.log.debug(" Grants:")
|
|
for ap in allocation_providers.values():
|
|
ap.makeGrants()
|
|
for g in ap.grants:
|
|
self.log.debug(' %s' % g)
|
|
for agt in g.targets:
|
|
self.log.debug(' %s' % agt)
|
|
tlp = tlps[agt]
|
|
nodes_to_launch.append((tlp, agt.amount))
|
|
|
|
allocation_history.grantsDone()
|
|
|
|
self.log.debug("Finished node launch calculation")
|
|
return nodes_to_launch
|
|
|
|
def getNeededSubNodes(self, session):
|
|
nodes_to_launch = []
|
|
for node in session.getNodes():
|
|
if node.label_name in self.config.labels:
|
|
expected_subnodes = \
|
|
self.config.labels[node.label_name].subnodes
|
|
active_subnodes = len([n for n in node.subnodes
|
|
if n.state != nodedb.DELETE])
|
|
deficit = max(expected_subnodes - active_subnodes, 0)
|
|
if deficit:
|
|
nodes_to_launch.append((node, deficit))
|
|
return nodes_to_launch
|
|
|
|
def updateConfig(self):
|
|
config = self.loadConfig()
|
|
self.reconfigureDatabase(config)
|
|
self.reconfigureZooKeeper(config)
|
|
self.reconfigureManagers(config)
|
|
self.reconfigureUpdateListeners(config)
|
|
self.reconfigureGearmanClient(config)
|
|
self.reconfigureCrons(config)
|
|
self.setConfig(config)
|
|
|
|
def startup(self):
|
|
self.updateConfig()
|
|
|
|
# Currently nodepool can not resume building a node or image
|
|
# after a restart. To clean up, mark all building node and
|
|
# images for deletion when the daemon starts.
|
|
with self.getDB().getSession() as session:
|
|
for node in session.getNodes(state=nodedb.BUILDING):
|
|
self.log.info("Setting building node id: %s to delete "
|
|
"on startup" % node.id)
|
|
node.state = nodedb.DELETE
|
|
|
|
def run(self):
|
|
try:
|
|
self.startup()
|
|
except Exception:
|
|
self.log.exception("Exception in startup:")
|
|
allocation_history = allocation.AllocationHistory()
|
|
while not self._stopped:
|
|
try:
|
|
self.updateConfig()
|
|
with self.getDB().getSession() as session:
|
|
self._run(session, allocation_history)
|
|
except Exception:
|
|
self.log.exception("Exception in main loop:")
|
|
self._wake_condition.acquire()
|
|
self._wake_condition.wait(self.watermark_sleep)
|
|
self._wake_condition.release()
|
|
|
|
def _run(self, session, allocation_history):
|
|
if self.no_launches:
|
|
return
|
|
# Make up the subnode deficit first to make sure that an
|
|
# already allocated node has priority in filling its subnodes
|
|
# ahead of new nodes.
|
|
subnodes_to_launch = self.getNeededSubNodes(session)
|
|
for (node, num_to_launch) in subnodes_to_launch:
|
|
self.log.info("Need to launch %s subnodes for node id: %s" %
|
|
(num_to_launch, node.id))
|
|
for i in range(num_to_launch):
|
|
self.launchSubNode(session, node)
|
|
|
|
nodes_to_launch = self.getNeededNodes(session, allocation_history)
|
|
|
|
for (tlp, num_to_launch) in nodes_to_launch:
|
|
(target, label, provider) = tlp
|
|
if (not target.online) or (not num_to_launch):
|
|
continue
|
|
self.log.info("Need to launch %s %s nodes for %s on %s" %
|
|
(num_to_launch, label.name,
|
|
target.name, provider.name))
|
|
for i in range(num_to_launch):
|
|
cloud_image = self.zk.getMostRecentImageUpload(
|
|
label.image, provider.name)
|
|
if not cloud_image:
|
|
self.log.debug("No current image for %s on %s"
|
|
% (label.image, provider.name))
|
|
else:
|
|
self.launchNode(session, provider, label, target)
|
|
|
|
def launchNode(self, session, provider, label, target):
|
|
try:
|
|
self._launchNode(session, provider, label, target)
|
|
except Exception:
|
|
self.log.exception(
|
|
"Could not launch node %s on %s", label.name, provider.name)
|
|
|
|
def _launchNode(self, session, provider, label, target):
|
|
provider = self.config.providers[provider.name]
|
|
timeout = provider.boot_timeout
|
|
launch_timeout = provider.launch_timeout
|
|
if provider.azs:
|
|
az = random.choice(provider.azs)
|
|
else:
|
|
az = None
|
|
node = session.createNode(provider.name, label.name, target.name, az)
|
|
t = NodeLauncher(self, provider, label, target, node.id, timeout,
|
|
launch_timeout)
|
|
t.start()
|
|
|
|
def launchSubNode(self, session, node):
|
|
try:
|
|
self._launchSubNode(session, node)
|
|
except Exception:
|
|
self.log.exception(
|
|
"Could not launch subnode for node id: %s", node.id)
|
|
|
|
def _launchSubNode(self, session, node):
|
|
provider = self.config.providers[node.provider_name]
|
|
label = self.config.labels[node.label_name]
|
|
timeout = provider.boot_timeout
|
|
launch_timeout = provider.launch_timeout
|
|
subnode = session.createSubNode(node)
|
|
t = SubNodeLauncher(self, provider, label, subnode.id,
|
|
node.id, node.target_name, timeout, launch_timeout,
|
|
node_az=node.az, manager_name=node.manager_name)
|
|
t.start()
|
|
|
|
def deleteSubNode(self, subnode, manager):
|
|
# Don't try too hard here, the actual node deletion will make
|
|
# sure this is cleaned up.
|
|
if subnode.external_id:
|
|
try:
|
|
self.log.debug('Deleting server %s for subnode id: '
|
|
'%s of node id: %s' %
|
|
(subnode.external_id, subnode.id,
|
|
subnode.node.id))
|
|
manager.cleanupServer(subnode.external_id)
|
|
manager.waitForServerDeletion(subnode.external_id)
|
|
except provider_manager.NotFound:
|
|
pass
|
|
subnode.delete()
|
|
|
|
def deleteNode(self, node_id):
|
|
try:
|
|
self._delete_threads_lock.acquire()
|
|
if node_id in self._delete_threads:
|
|
return
|
|
t = NodeDeleter(self, node_id)
|
|
self._delete_threads[node_id] = t
|
|
t.start()
|
|
except Exception:
|
|
self.log.exception("Could not delete node %s", node_id)
|
|
finally:
|
|
self._delete_threads_lock.release()
|
|
|
|
def revokeAssignedNode(self, node):
|
|
args = dict(name=node.nodename)
|
|
job = jobs.NodeRevokeJob(node.id, node.manager_name,
|
|
args, self)
|
|
self.gearman_client.submitJob(job, timeout=300)
|
|
# Do not wait for completion in case the manager is offline
|
|
|
|
def _deleteNode(self, session, node):
|
|
self.log.debug("Deleting node id: %s which has been in %s "
|
|
"state for %s hours" %
|
|
(node.id, nodedb.STATE_NAMES[node.state],
|
|
(time.time() - node.state_time) / (60 * 60)))
|
|
# Delete a node
|
|
if node.state != nodedb.DELETE:
|
|
# Don't write to the session if not needed.
|
|
node.state = nodedb.DELETE
|
|
self.updateStats(session, node.provider_name)
|
|
provider = self.config.providers[node.provider_name]
|
|
target = self.config.targets[node.target_name]
|
|
label = self.config.labels.get(node.label_name, None)
|
|
if label and label.image in provider.images:
|
|
image_name = provider.images[label.image].name
|
|
else:
|
|
image_name = None
|
|
manager = self.getProviderManager(provider)
|
|
|
|
if target.jenkins_url and (node.nodename is not None):
|
|
jenkins = self.getJenkinsManager(target)
|
|
jenkins_name = node.nodename
|
|
if jenkins.nodeExists(jenkins_name):
|
|
jenkins.deleteNode(jenkins_name)
|
|
self.log.info("Deleted jenkins node id: %s" % node.id)
|
|
|
|
if node.manager_name is not None:
|
|
try:
|
|
self.revokeAssignedNode(node)
|
|
except Exception:
|
|
self.log.exception("Exception revoking node id: %s" %
|
|
node.id)
|
|
|
|
for subnode in node.subnodes:
|
|
if subnode.external_id:
|
|
try:
|
|
self.log.debug('Deleting server %s for subnode id: '
|
|
'%s of node id: %s' %
|
|
(subnode.external_id, subnode.id, node.id))
|
|
manager.cleanupServer(subnode.external_id)
|
|
except provider_manager.NotFound:
|
|
pass
|
|
|
|
if node.external_id:
|
|
try:
|
|
self.log.debug('Deleting server %s for node id: %s' %
|
|
(node.external_id, node.id))
|
|
manager.cleanupServer(node.external_id)
|
|
manager.waitForServerDeletion(node.external_id)
|
|
except provider_manager.NotFound:
|
|
pass
|
|
node.external_id = None
|
|
|
|
for subnode in node.subnodes:
|
|
if subnode.external_id:
|
|
manager.waitForServerDeletion(subnode.external_id)
|
|
subnode.delete()
|
|
|
|
node.delete()
|
|
self.log.info("Deleted node id: %s" % node.id)
|
|
|
|
if self.statsd:
|
|
dt = int((time.time() - node.state_time) * 1000)
|
|
key = 'nodepool.delete.%s.%s.%s' % (image_name,
|
|
node.provider_name,
|
|
node.target_name)
|
|
self.statsd.timing(key, dt)
|
|
self.statsd.incr(key)
|
|
self.updateStats(session, node.provider_name)
|
|
|
|
def deleteInstance(self, provider_name, external_id):
|
|
key = (provider_name, external_id)
|
|
try:
|
|
self._instance_delete_threads_lock.acquire()
|
|
if key in self._instance_delete_threads:
|
|
return
|
|
t = InstanceDeleter(self, provider_name, external_id)
|
|
self._instance_delete_threads[key] = t
|
|
t.start()
|
|
except Exception:
|
|
self.log.exception("Could not delete instance %s on provider %s",
|
|
provider_name, external_id)
|
|
finally:
|
|
self._instance_delete_threads_lock.release()
|
|
|
|
def _deleteInstance(self, provider_name, external_id):
|
|
provider = self.config.providers[provider_name]
|
|
manager = self.getProviderManager(provider)
|
|
manager.cleanupServer(external_id)
|
|
|
|
def _doPeriodicCleanup(self):
|
|
if self.no_deletes:
|
|
return
|
|
try:
|
|
self.periodicCleanup()
|
|
except Exception:
|
|
self.log.exception("Exception in periodic cleanup:")
|
|
|
|
def periodicCleanup(self):
|
|
# This function should be run periodically to clean up any hosts
|
|
# that may have slipped through the cracks, as well as to remove
|
|
# old images.
|
|
|
|
self.log.debug("Starting periodic cleanup")
|
|
|
|
for k, t in self._delete_threads.items()[:]:
|
|
if not t.isAlive():
|
|
del self._delete_threads[k]
|
|
|
|
for k, t in self._instance_delete_threads.items()[:]:
|
|
if not t.isAlive():
|
|
del self._instance_delete_threads[k]
|
|
|
|
node_ids = []
|
|
with self.getDB().getSession() as session:
|
|
for node in session.getNodes():
|
|
node_ids.append(node.id)
|
|
|
|
for node_id in node_ids:
|
|
try:
|
|
with self.getDB().getSession() as session:
|
|
node = session.getNode(node_id)
|
|
if node:
|
|
self.cleanupOneNode(session, node)
|
|
except Exception:
|
|
self.log.exception("Exception cleaning up node id %s:" %
|
|
node_id)
|
|
|
|
try:
|
|
self.cleanupLeakedInstances()
|
|
pass
|
|
except Exception:
|
|
self.log.exception("Exception cleaning up leaked nodes")
|
|
|
|
self.log.debug("Finished periodic cleanup")
|
|
|
|
def cleanupLeakedInstances(self):
|
|
known_providers = self.config.providers.keys()
|
|
for provider in self.config.providers.values():
|
|
manager = self.getProviderManager(provider)
|
|
servers = manager.listServers()
|
|
with self.getDB().getSession() as session:
|
|
for server in servers:
|
|
meta = server.get('metadata', {}).get('nodepool')
|
|
if not meta:
|
|
self.log.debug("Instance %s (%s) in %s has no "
|
|
"nodepool metadata" % (
|
|
server['name'], server['id'],
|
|
provider.name))
|
|
continue
|
|
meta = json.loads(meta)
|
|
if meta['provider_name'] not in known_providers:
|
|
self.log.debug("Instance %s (%s) in %s "
|
|
"lists unknown provider %s" % (
|
|
server['name'], server['id'],
|
|
provider.name,
|
|
meta['provider_name']))
|
|
continue
|
|
nodepool_id = meta.get('nodepool_id', None)
|
|
if provider.nodepool_id is not None and \
|
|
nodepool_id != provider.nodepool_id:
|
|
self.log.debug("Instance %s (%s) in %s "
|
|
"was not launched by us" % (
|
|
server['name'], server['id'],
|
|
provider.name))
|
|
continue
|
|
node_id = meta.get('node_id')
|
|
if node_id:
|
|
if session.getNode(node_id):
|
|
continue
|
|
self.log.warning("Deleting leaked instance %s (%s) "
|
|
"in %s for node id: %s" % (
|
|
server['name'], server['id'],
|
|
provider.name, node_id))
|
|
self.deleteInstance(provider.name, server['id'])
|
|
else:
|
|
self.log.warning("Instance %s (%s) in %s has no "
|
|
"database id" % (
|
|
server['name'], server['id'],
|
|
provider.name))
|
|
continue
|
|
if provider.clean_floating_ips:
|
|
manager.cleanupLeakedFloaters()
|
|
|
|
def cleanupOneNode(self, session, node):
|
|
now = time.time()
|
|
time_in_state = now - node.state_time
|
|
if (node.state in [nodedb.READY, nodedb.HOLD]):
|
|
return
|
|
delete = False
|
|
if (node.state == nodedb.DELETE):
|
|
delete = True
|
|
elif (node.state == nodedb.TEST and
|
|
time_in_state > TEST_CLEANUP):
|
|
delete = True
|
|
elif time_in_state > NODE_CLEANUP:
|
|
delete = True
|
|
if delete:
|
|
try:
|
|
self.deleteNode(node.id)
|
|
except Exception:
|
|
self.log.exception("Exception deleting node id: "
|
|
"%s" % node.id)
|
|
|
|
def _doPeriodicCheck(self):
|
|
if self.no_deletes:
|
|
return
|
|
try:
|
|
with self.getDB().getSession() as session:
|
|
self.periodicCheck(session)
|
|
except Exception:
|
|
self.log.exception("Exception in periodic check:")
|
|
|
|
def periodicCheck(self, session):
|
|
# This function should be run periodically to make sure we can
|
|
# still access hosts via ssh.
|
|
|
|
self.log.debug("Starting periodic check")
|
|
for node in session.getNodes():
|
|
if node.state != nodedb.READY:
|
|
continue
|
|
provider = self.config.providers[node.provider_name]
|
|
if node.label_name in self.config.labels:
|
|
label = self.config.labels[node.label_name]
|
|
image = provider.images[label.image]
|
|
connect_kwargs = dict(key_filename=image.private_key)
|
|
try:
|
|
if utils.ssh_connect(node.ip, image.username,
|
|
connect_kwargs=connect_kwargs):
|
|
continue
|
|
except Exception:
|
|
self.log.exception("SSH Check failed for node id: %s" %
|
|
node.id)
|
|
else:
|
|
self.log.exception("Node with non-existing label %s" %
|
|
node.label_name)
|
|
self.deleteNode(node.id)
|
|
self.log.debug("Finished periodic check")
|
|
|
|
def updateStats(self, session, provider_name):
|
|
if not self.statsd:
|
|
return
|
|
# This may be called outside of the main thread.
|
|
|
|
states = {}
|
|
|
|
#nodepool.nodes.STATE
|
|
#nodepool.target.TARGET.nodes.STATE
|
|
#nodepool.label.LABEL.nodes.STATE
|
|
#nodepool.provider.PROVIDER.nodes.STATE
|
|
for state in nodedb.STATE_NAMES.values():
|
|
key = 'nodepool.nodes.%s' % state
|
|
states[key] = 0
|
|
for target in self.config.targets.values():
|
|
key = 'nodepool.target.%s.nodes.%s' % (
|
|
target.name, state)
|
|
states[key] = 0
|
|
for label in self.config.labels.values():
|
|
key = 'nodepool.label.%s.nodes.%s' % (
|
|
label.name, state)
|
|
states[key] = 0
|
|
for provider in self.config.providers.values():
|
|
key = 'nodepool.provider.%s.nodes.%s' % (
|
|
provider.name, state)
|
|
states[key] = 0
|
|
|
|
managers = set()
|
|
|
|
for node in session.getNodes():
|
|
if node.state not in nodedb.STATE_NAMES:
|
|
continue
|
|
state = nodedb.STATE_NAMES[node.state]
|
|
key = 'nodepool.nodes.%s' % state
|
|
total_nodes = self.config.labels[node.label_name].subnodes + 1
|
|
states[key] += total_nodes
|
|
|
|
# NOTE(pabelanger): Check if we assign nodes via Gearman if so, use
|
|
# the manager name.
|
|
#nodepool.manager.MANAGER.nodes.STATE
|
|
if node.manager_name:
|
|
key = 'nodepool.manager.%s.nodes.%s' % (
|
|
node.manager_name, state)
|
|
if key not in states:
|
|
states[key] = 0
|
|
managers.add(node.manager_name)
|
|
else:
|
|
key = 'nodepool.target.%s.nodes.%s' % (
|
|
node.target_name, state)
|
|
states[key] += total_nodes
|
|
|
|
key = 'nodepool.label.%s.nodes.%s' % (
|
|
node.label_name, state)
|
|
states[key] += total_nodes
|
|
|
|
key = 'nodepool.provider.%s.nodes.%s' % (
|
|
node.provider_name, state)
|
|
states[key] += total_nodes
|
|
|
|
# NOTE(pabelanger): Initialize other state values to zero if missed
|
|
# above.
|
|
#nodepool.manager.MANAGER.nodes.STATE
|
|
for state in nodedb.STATE_NAMES.values():
|
|
for manager_name in managers:
|
|
key = 'nodepool.manager.%s.nodes.%s' % (
|
|
manager_name, state)
|
|
if key not in states:
|
|
states[key] = 0
|
|
|
|
for key, count in states.items():
|
|
self.statsd.gauge(key, count)
|
|
|
|
#nodepool.provider.PROVIDER.max_servers
|
|
for provider in self.config.providers.values():
|
|
key = 'nodepool.provider.%s.max_servers' % provider.name
|
|
self.statsd.gauge(key, provider.max_servers)
|
|
|
|
def launchStats(self, subkey, dt, image_name,
|
|
provider_name, target_name, node_az, manager_name):
|
|
if not self.statsd:
|
|
return
|
|
#nodepool.launch.provider.PROVIDER.subkey
|
|
#nodepool.launch.image.IMAGE.subkey
|
|
#nodepool.launch.subkey
|
|
keys = [
|
|
'nodepool.launch.provider.%s.%s' % (provider_name, subkey),
|
|
'nodepool.launch.image.%s.%s' % (image_name, subkey),
|
|
'nodepool.launch.%s' % (subkey,),
|
|
]
|
|
if node_az:
|
|
#nodepool.launch.provider.PROVIDER.AZ.subkey
|
|
keys.append('nodepool.launch.provider.%s.%s.%s' %
|
|
(provider_name, node_az, subkey))
|
|
|
|
if manager_name:
|
|
# NOTE(pabelanger): Check if we assign nodes via Gearman if so, use
|
|
# the manager name.
|
|
#nodepool.launch.manager.MANAGER.subkey
|
|
keys.append('nodepool.launch.manager.%s.%s' %
|
|
(manager_name, subkey))
|
|
else:
|
|
#nodepool.launch.target.TARGET.subkey
|
|
keys.append('nodepool.launch.target.%s.%s' %
|
|
(target_name, subkey))
|
|
|
|
for key in keys:
|
|
self.statsd.timing(key, dt)
|
|
self.statsd.incr(key)
|