initial checkin
This commit is contained in:
commit
0ddd638b68
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
*.pyc
|
9
README.md
Normal file
9
README.md
Normal file
@ -0,0 +1,9 @@
|
||||
rcbau-ci
|
||||
========
|
||||
|
||||
A set of CI tools used by RCBAU.
|
||||
|
||||
worker.py is a worker server that loads and runs task_plugins.
|
||||
|
||||
Each task_plugin is a gearman worker that implements, handles and executes a
|
||||
job.
|
10
etc/sql-migrate-gearman-worker.json
Normal file
10
etc/sql-migrate-gearman-worker.json
Normal file
@ -0,0 +1,10 @@
|
||||
{
|
||||
"zuul_server": {
|
||||
"git_url": "/home/josh/var/lib/zuul/git/",
|
||||
"gearman_host": "localhost",
|
||||
"gearman_port": 4730
|
||||
},
|
||||
"debug_log": "debug.log",
|
||||
"git_working_dir": "/home/josh/var/lib/ci-worker/git/",
|
||||
"job_log_dir": "/home/josh/var/lib/ci-worker/logs/"
|
||||
}
|
158
etc/worker_server.init
Normal file
158
etc/worker_server.init
Normal file
@ -0,0 +1,158 @@
|
||||
#! /bin/sh
|
||||
### BEGIN INIT INFO
|
||||
# Provides: worker-server
|
||||
# Required-Start: $remote_fs $syslog
|
||||
# Required-Stop: $remote_fs $syslog
|
||||
# Default-Start: 2 3 4 5
|
||||
# Default-Stop: 0 1 6
|
||||
# Short-Description: CI Worker
|
||||
# Description: Service to run extra CI tests
|
||||
### END INIT INFO
|
||||
|
||||
# Do NOT "set -e"
|
||||
|
||||
# PATH should only include /usr/* if it runs after the mountnfs.sh script
|
||||
PATH=/sbin:/usr/sbin:/bin:/usr/bin
|
||||
DESC="CI Worker Server"
|
||||
NAME=ci-worker-server
|
||||
DAEMON=/usr/local/bin/worker_server.py
|
||||
PIDFILE=/var/run/$NAME/$NAME.pid
|
||||
DAEMON_ARGS="-c /etc/ci-worker/config.json -p $PIDFILE"
|
||||
SCRIPTNAME=/etc/init.d/$NAME
|
||||
USER=ciworker
|
||||
|
||||
# Exit if the package is not installed
|
||||
[ -x "$DAEMON" ] || exit 0
|
||||
|
||||
# Read configuration variable file if it is present
|
||||
[ -r /etc/default/$NAME ] && . /etc/default/$NAME
|
||||
|
||||
# Load the VERBOSE setting and other rcS variables
|
||||
. /lib/init/vars.sh
|
||||
|
||||
# Define LSB log_* functions.
|
||||
# Depend on lsb-base (>= 3.0-6) to ensure that this file is present.
|
||||
. /lib/lsb/init-functions
|
||||
|
||||
#
|
||||
# Function that starts the daemon/service
|
||||
#
|
||||
do_start()
|
||||
{
|
||||
# Return
|
||||
# 0 if daemon has been started
|
||||
# 1 if daemon was already running
|
||||
# 2 if daemon could not be started
|
||||
|
||||
mkdir -p /var/run/$NAME
|
||||
chown $USER /var/run/$NAME
|
||||
start-stop-daemon --start --quiet --pidfile $PIDFILE -c $USER --exec $DAEMON --test > /dev/null \
|
||||
|| return 1
|
||||
start-stop-daemon --start --quiet --pidfile $PIDFILE -c $USER --exec $DAEMON -- \
|
||||
$DAEMON_ARGS \
|
||||
|| return 2
|
||||
# Add code here, if necessary, that waits for the process to be ready
|
||||
# to handle requests from services started subsequently which depend
|
||||
# on this one. As a last resort, sleep for some time.
|
||||
}
|
||||
|
||||
#
|
||||
# Function that stops the daemon/service
|
||||
#
|
||||
do_stop()
|
||||
{
|
||||
# Return
|
||||
# 0 if daemon has been stopped
|
||||
# 1 if daemon was already stopped
|
||||
# 2 if daemon could not be stopped
|
||||
# other if a failure occurred
|
||||
start-stop-daemon --stop --signal 9 --pidfile $PIDFILE
|
||||
RETVAL="$?"
|
||||
[ "$RETVAL" = 2 ] && return 2
|
||||
rm -f /var/run/$NAME/*
|
||||
return "$RETVAL"
|
||||
}
|
||||
|
||||
#
|
||||
# Function that stops the daemon/service
|
||||
#
|
||||
#do_graceful_stop()
|
||||
#{
|
||||
# PID=`cat $PIDFILE`
|
||||
# kill -USR1 $PID
|
||||
#
|
||||
# # wait until really stopped
|
||||
# if [ -n "${PID:-}" ]; then
|
||||
# i=0
|
||||
# while kill -0 "${PID:-}" 2> /dev/null; do
|
||||
# if [ $i -eq '0' ]; then
|
||||
# echo -n " ... waiting "
|
||||
# else
|
||||
# echo -n "."
|
||||
# fi
|
||||
# i=$(($i+1))
|
||||
# sleep 1
|
||||
# done
|
||||
# fi
|
||||
#
|
||||
# rm -f /var/run/$NAME/*
|
||||
#}
|
||||
|
||||
#
|
||||
# Function that sends a SIGHUP to the daemon/service
|
||||
#
|
||||
#do_reload() {
|
||||
# #
|
||||
# # If the daemon can reload its configuration without
|
||||
# # restarting (for example, when it is sent a SIGHUP),
|
||||
# # then implement that here.
|
||||
# #
|
||||
# start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE --name zuul-server
|
||||
# return 0
|
||||
#}
|
||||
|
||||
case "$1" in
|
||||
start)
|
||||
[ "$VERBOSE" != no ] && log_daemon_msg "Starting $DESC" "$NAME"
|
||||
do_start
|
||||
case "$?" in
|
||||
0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;;
|
||||
2) [ "$VERBOSE" != no ] && log_end_msg 1 ;;
|
||||
esac
|
||||
;;
|
||||
stop)
|
||||
[ "$VERBOSE" != no ] && log_daemon_msg "Stopping $DESC" "$NAME"
|
||||
do_stop
|
||||
case "$?" in
|
||||
0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;;
|
||||
2) [ "$VERBOSE" != no ] && log_end_msg 1 ;;
|
||||
esac
|
||||
;;
|
||||
status)
|
||||
status_of_proc "$DAEMON" "$NAME" && exit 0 || exit $?
|
||||
;;
|
||||
# reload)
|
||||
# #
|
||||
# # If do_reload() is not implemented then leave this commented out
|
||||
# # and leave 'force-reload' as an alias for 'restart'.
|
||||
# #
|
||||
# log_daemon_msg "Reloading $DESC" "$NAME"
|
||||
# do_reload
|
||||
# log_end_msg $?
|
||||
# ;;
|
||||
restart|force-reload)
|
||||
#
|
||||
# If the "reload" option is implemented then remove the
|
||||
# 'force-reload' alias
|
||||
#
|
||||
log_daemon_msg "Restarting $DESC" "$NAME"
|
||||
do_stop
|
||||
do_start
|
||||
;;
|
||||
*)
|
||||
echo "Usage: $SCRIPTNAME {start|stop|status|restart|force-reload}" >&2
|
||||
exit 3
|
||||
;;
|
||||
esac
|
||||
|
||||
:
|
0
lib/__init__.py
Normal file
0
lib/__init__.py
Normal file
142
lib/utils.py
Normal file
142
lib/utils.py
Normal file
@ -0,0 +1,142 @@
|
||||
# Copyright ....
|
||||
|
||||
import git
|
||||
import logging
|
||||
import os
|
||||
import select
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
|
||||
class GitRepository(object):
|
||||
|
||||
""" Manage a git repository for our uses """
|
||||
log = logging.getLogger("rcbau-ci.GitRepository")
|
||||
|
||||
def __init__(self, remote_url, local_path):
|
||||
self.remote_url = remote_url
|
||||
self.local_path = local_path
|
||||
self._ensure_cloned()
|
||||
|
||||
self.repo = git.Repo(self.local_path)
|
||||
|
||||
def fetch(self, ref):
|
||||
# The git.remote.fetch method may read in git progress info and
|
||||
# interpret it improperly causing an AssertionError. Because the
|
||||
# data was fetched properly subsequent fetches don't seem to fail.
|
||||
# So try again if an AssertionError is caught.
|
||||
origin = self.repo.remotes.origin
|
||||
self.log.debug("Fetching %s from %s" % (ref, origin))
|
||||
|
||||
try:
|
||||
origin.fetch(ref)
|
||||
except AssertionError:
|
||||
origin.fetch(ref)
|
||||
|
||||
def checkout(self, ref):
|
||||
self.log.debug("Checking out %s" % ref)
|
||||
return self.repo.git.checkout(ref)
|
||||
|
||||
def _ensure_cloned(self):
|
||||
if not os.path.exists(self.local_path):
|
||||
self.log.debug("Cloning from %s to %s" % (self.remote_url,
|
||||
self.local_path))
|
||||
git.Repo.clone_from(self.remote_url, self.local_path)
|
||||
|
||||
""" ####UNUSED>....
|
||||
def _make_ssh_wrappesubprocessr(self, key):
|
||||
name = os.path.join(self.config['git_working_dir'], '.ssh_wrapper')
|
||||
fd = open(name, 'w')
|
||||
fd.write('#!/bin/bash\n')
|
||||
fd.write('ssh -i %s $@\n' % key)
|
||||
fd.close()
|
||||
os.chmod(name, 0755)
|
||||
os.environ['GIT_SSH'] = name
|
||||
|
||||
def construct_git_url(self, project_name):
|
||||
url = 'ssh://%s@%s:%s/%s' % (
|
||||
self.config['gerrit_server']['user'],
|
||||
self.config['gerrit_server']['host'],
|
||||
self.config['gerrit_server']['port'],
|
||||
project_name
|
||||
)
|
||||
url = 'https://%s/%s' % (
|
||||
self.config['gerrit_server']['host'],
|
||||
project_name
|
||||
)
|
||||
return url
|
||||
"""
|
||||
|
||||
|
||||
def execute_to_log(cmd, logfile, timeout=-1,
|
||||
watch_logs=[
|
||||
('[syslog]', '/var/log/syslog'),
|
||||
('[sqlslo]', '/var/log/mysql/slow-queries.log'),
|
||||
('[sqlerr]', '/var/log/mysql/error.log')
|
||||
],
|
||||
heartbeat=True
|
||||
):
|
||||
""" Executes a command and logs the STDOUT/STDERR and output of any
|
||||
supplied watch_logs from logs into a new logfile
|
||||
|
||||
watch_logs is a list of tuples with (name,file) """
|
||||
|
||||
if not os.path.isdir(os.path.dirname(logfile)):
|
||||
os.makedirs(os.path.dirname(logfile))
|
||||
|
||||
logger = logging.getLogger('execute_to_log')
|
||||
log_hanlder = logging.FileHandler(logfile)
|
||||
log_formatter = logging.Formatter('%(asctime)s %(message)s')
|
||||
log_hanlder.setFormatter(log_formatter)
|
||||
logger.addHandler(log_hanlder)
|
||||
|
||||
descriptors = {}
|
||||
|
||||
for watch_file in watch_logs:
|
||||
fd = os.open(watch_file[1], os.O_RDONLY)
|
||||
os.lseek(fd, 0, os.SEEK_END)
|
||||
descriptors[fd] = dict(
|
||||
name=watch_file[0],
|
||||
poll=select.POLLIN
|
||||
)
|
||||
|
||||
cmd += ' 2>&1'
|
||||
start_time = time.time()
|
||||
p = subprocess.Popen(
|
||||
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
|
||||
descriptors[p.stdout.fileno()] = dict(
|
||||
name='[stdout]',
|
||||
poll=(select.POLLIN | select.POLLHUP)
|
||||
)
|
||||
descriptors[p.stderr.fileno()] = dict(
|
||||
name='[stderr]',
|
||||
poll=(select.POLLIN | select.POLLHUP)
|
||||
)
|
||||
|
||||
poll_obj = select.poll()
|
||||
for fd, descriptor in descriptors.items():
|
||||
poll_obj.register(fd, descriptor['poll'])
|
||||
|
||||
last_heartbeat = time.time()
|
||||
|
||||
while p.poll() is None:
|
||||
if timeout > 0 and time.time() - start_time > timeout:
|
||||
# Append to logfile
|
||||
logger.info("[timeout]")
|
||||
os.kill(p.pid, 9)
|
||||
|
||||
for fd, flag in poll_obj.poll(0):
|
||||
lines = os.read(fd, 1024 * 1024)
|
||||
for l in lines.split('\n'):
|
||||
if len(l) > 0:
|
||||
l = '%s %s' % (descriptors[fd]['name'], l)
|
||||
logger.info(l)
|
||||
last_heartbeat = time.time()
|
||||
|
||||
if time.time() - last_heartbeat > 30:
|
||||
# Append to logfile
|
||||
logger.info("[heartbeat]")
|
||||
last_heartbeat = time.time()
|
||||
|
||||
logger.info('[script exit code = %d]' % p.returncode)
|
0
task_plugins/__init__.py
Normal file
0
task_plugins/__init__.py
Normal file
0
task_plugins/gate_real_db_upgrade/__init__.py
Normal file
0
task_plugins/gate_real_db_upgrade/__init__.py
Normal file
157
task_plugins/gate_real_db_upgrade/task.py
Normal file
157
task_plugins/gate_real_db_upgrade/task.py
Normal file
@ -0,0 +1,157 @@
|
||||
# Copyright ...
|
||||
|
||||
import gear
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
|
||||
from lib import utils
|
||||
|
||||
__worker_name__ = 'sql-migrate-test-runner-%s' % os.uname()[1]
|
||||
|
||||
|
||||
class Runner(threading.Thread):
|
||||
|
||||
""" This thread handles the actual sql-migration tests.
|
||||
It pulls in a gearman job from the build:gate-real-db-upgrade
|
||||
queue and runs it through _handle_patchset"""
|
||||
|
||||
log = logging.getLogger("rcbau-ci.task_plugins.task.Runner")
|
||||
|
||||
def __init__(self, config):
|
||||
super(Runner, self).__init__()
|
||||
self._stop = threading.Event()
|
||||
self.config = config
|
||||
|
||||
# Set up the runner worker
|
||||
self.gearman_worker = None
|
||||
self.setup_gearman()
|
||||
|
||||
self.work_data = None
|
||||
self.cancelled = False
|
||||
|
||||
# Define the number of steps we will do to determine our progress.
|
||||
self.current_step = 0
|
||||
self.total_steps = 4
|
||||
|
||||
def setup_gearman(self):
|
||||
self.gearman_worker = gear.Worker(__worker_name__)
|
||||
self.gearman_worker.addServer(
|
||||
self.config['zuul_server']['gearman_host'],
|
||||
self.config['zuul_server']['gearman_port']
|
||||
)
|
||||
self.gearman_worker.registerFunction('build:gate-real-db-upgrade')
|
||||
|
||||
def stop(self):
|
||||
self._stop.set()
|
||||
# Unblock gearman
|
||||
self.log.debug("Telling gearman to stop waiting for jobs")
|
||||
self.gearman_worker.stopWaitingForJobs()
|
||||
self.gearman_worker.shutdown()
|
||||
|
||||
def stopped(self):
|
||||
return self._stop.isSet()
|
||||
|
||||
def stop_worker(self, number):
|
||||
self.cancelled = True
|
||||
|
||||
def run(self):
|
||||
while True and not self.stopped():
|
||||
try:
|
||||
# gearman_worker.getJob() blocks until a job is available
|
||||
logging.debug("Waiting for job")
|
||||
self.current_step = 0
|
||||
self.cancelled = False
|
||||
job = self.gearman_worker.getJob()
|
||||
self._handle_job(job)
|
||||
return
|
||||
except:
|
||||
logging.exception('Exception retrieving log event.')
|
||||
|
||||
def _handle_job(self, job):
|
||||
try:
|
||||
job_arguments = json.loads(job.arguments.decode('utf-8'))
|
||||
self.log.debug("Got job from ZUUL %s" % job_arguments)
|
||||
|
||||
# Send an initial WORK_DATA and WORK_STATUS packets
|
||||
self._send_work_data(job)
|
||||
|
||||
# Step 1: Checkout updates from git!
|
||||
self._do_next_step(job)
|
||||
|
||||
# Checkout the patchset
|
||||
local_path = self._grab_patchset(
|
||||
job_arguments['ZUUL_PROJECT'],
|
||||
job_arguments['ZUUL_REF']
|
||||
)
|
||||
|
||||
# Step 2:
|
||||
self._do_next_step(job)
|
||||
utils.execute_to_log(
|
||||
'ping && sleep 70',
|
||||
os.path.join(
|
||||
self.config['job_log_dir'],
|
||||
job.unique,
|
||||
'testing.log'
|
||||
),
|
||||
timeout=70
|
||||
)
|
||||
|
||||
# Step 3:
|
||||
self._do_next_step(job)
|
||||
|
||||
# Final step, send completed packet
|
||||
self._send_work_data(job)
|
||||
job.sendWorkComplete(json.dumps(self._get_work_data()))
|
||||
except Exception as e:
|
||||
self.log.exception('Exception handling log event.')
|
||||
if not self.cancelled:
|
||||
job.sendWorkException(str(e).encode('utf-8'))
|
||||
|
||||
def _grab_patchset(self, project_name, zuul_ref):
|
||||
""" Checkout the reference into config['git_working_dir'] """
|
||||
|
||||
repo = utils.GitRepository(
|
||||
self.config['zuul_server']['git_url'] + project_name,
|
||||
os.path.join(
|
||||
self.config['git_working_dir'],
|
||||
__worker_name__,
|
||||
project_name
|
||||
)
|
||||
)
|
||||
|
||||
repo.fetch(zuul_ref)
|
||||
repo.checkout('FETCH_HEAD')
|
||||
|
||||
return repo.local_path
|
||||
|
||||
def _get_work_data(self):
|
||||
if self.work_data is None:
|
||||
hostname = os.uname()[1]
|
||||
self.work_data = dict(
|
||||
name=__worker_name__,
|
||||
number=1,
|
||||
manager='rcbau-ci-manager-%s' % hostname,
|
||||
url='http://localhost',
|
||||
)
|
||||
return self.work_data
|
||||
|
||||
def _send_work_data(self, job):
|
||||
""" Send the WORK DATA in json format for job """
|
||||
job.sendWorkData(json.dumps(self._get_work_data()))
|
||||
|
||||
def _do_next_step(self, job):
|
||||
# Each opportunity we should check if we need to stop
|
||||
if self.stopped():
|
||||
self.work_data['result'] = "Failed: Worker interrupted/stopped"
|
||||
job.sendWorkStatus(self.current_step, self.total_steps)
|
||||
raise Exception('Thread stopped', 'stopping')
|
||||
elif self.cancelled:
|
||||
self.work_data['result'] = "Failed: Job cancelled"
|
||||
job.sendWorkStatus(self.current_step, self.total_steps)
|
||||
job.sendWorkFail()
|
||||
raise Exception('Job cancelled', 'stopping')
|
||||
|
||||
self.current_step += 1
|
||||
job.sendWorkStatus(self.current_step, self.total_steps)
|
118
task_plugins/gate_real_db_upgrade/test_sqlalchemy_migrations.sh
Normal file
118
task_plugins/gate_real_db_upgrade/test_sqlalchemy_migrations.sh
Normal file
@ -0,0 +1,118 @@
|
||||
#!/bin/bash
|
||||
|
||||
# $1 is the safe refs URL
|
||||
# $2 is the path to the git repo
|
||||
# $3 is the nova db user
|
||||
# $4 is the nova db password
|
||||
# $5 is the nova db name
|
||||
|
||||
pip_requires() {
|
||||
requires="tools/pip-requires"
|
||||
if [ ! -e $requires ]
|
||||
then
|
||||
requires="requirements.txt"
|
||||
fi
|
||||
echo "Install pip requirements from $requires"
|
||||
pip install -q -r $requires
|
||||
echo "Requirements installed"
|
||||
}
|
||||
|
||||
db_sync() {
|
||||
# $1 is the test target
|
||||
# $2 is the path to the git repo
|
||||
# $3 is the nova db user
|
||||
# $4 is the nova db password
|
||||
# $5 is the nova db name
|
||||
|
||||
# Create a nova.conf file
|
||||
cat - > $2/nova-$1.conf <<EOF
|
||||
[DEFAULT]
|
||||
sql_connection = mysql://$3:$4@localhost/$5?charset=utf8
|
||||
log_config = /srv/openstack-ci-tools/logging.conf
|
||||
EOF
|
||||
|
||||
find $2 -type f -name "*.pyc" -exec rm -f {} \;
|
||||
|
||||
nova_manage="$2/bin/nova-manage"
|
||||
if [ -e $nova_manage ]
|
||||
then
|
||||
echo "***** DB upgrade to state of $1 starts *****"
|
||||
python $nova_manage --config-file $2/nova-$1.conf db sync
|
||||
else
|
||||
python setup.py clean
|
||||
python setup.py develop
|
||||
echo "***** DB upgrade to state of $1 starts *****"
|
||||
nova-manage --config-file $2/nova-$1.conf db sync
|
||||
fi
|
||||
echo "***** DB upgrade to state of $1 finished *****"
|
||||
}
|
||||
|
||||
echo "To execute this script manually, run this:"
|
||||
echo "$0 $1 $2 $3 $4 $5"
|
||||
|
||||
set -x
|
||||
|
||||
# Setup the environment
|
||||
export PATH=/usr/lib/ccache:$PATH
|
||||
export PIP_DOWNLOAD_CACHE=/srv/cache/pip
|
||||
|
||||
# Restore database to known good state
|
||||
echo "Restoring test database $5"
|
||||
mysql --defaults-file=/srv/config/mysql -u root -e "drop database $5"
|
||||
mysql --defaults-file=/srv/config/mysql -u root -e "create database $5"
|
||||
mysql --defaults-file=/srv/config/mysql -u root -e "create user '$3'@'localhost' identified by '$4';"
|
||||
mysql --defaults-file=/srv/config/mysql -u root -e "grant all privileges on $5.* TO '$3'@'localhost';"
|
||||
mysql -u $3 --password=$4 $5 < /srv/datasets/$5.sql
|
||||
|
||||
echo "Build test environment"
|
||||
cd $2
|
||||
|
||||
set +x
|
||||
echo "Setting up virtual env"
|
||||
source ~/.bashrc
|
||||
source /etc/bash_completion.d/virtualenvwrapper
|
||||
rm -rf ~/virtualenvs/$1
|
||||
mkvirtualenv $1
|
||||
toggleglobalsitepackages
|
||||
set -x
|
||||
export PYTHONPATH=$PYTHONPATH:$2
|
||||
|
||||
# Some databases are from Folsom
|
||||
version=`mysql -u $3 --password=$4 $5 -e "select * from migrate_version \G" | grep version | sed 's/.*: //'`
|
||||
echo "Schema version is $version"
|
||||
if [ $version == "133" ]
|
||||
then
|
||||
echo "Database is from Folsom! Upgrade via grizzly"
|
||||
git checkout stable/grizzly
|
||||
pip_requires
|
||||
db_sync "grizzly" $2 $3 $4 $5
|
||||
fi
|
||||
|
||||
# Make sure the test DB is up to date with trunk
|
||||
git checkout target
|
||||
if [ `git show | grep "^\-\-\-" | grep "migrate_repo/versions" | wc -l` -gt 0 ]
|
||||
then
|
||||
echo "This change alters an existing migration, skipping trunk updates."
|
||||
else
|
||||
echo "Update database to current state of trunk"
|
||||
git checkout trunk
|
||||
pip_requires
|
||||
db_sync "trunk" $2 $3 $4 $5
|
||||
git checkout target
|
||||
fi
|
||||
|
||||
# Now run the patchset
|
||||
echo "Now test the patchset"
|
||||
pip_requires
|
||||
db_sync "patchset" $2 $3 $4 $5
|
||||
|
||||
# Determine the final schema version
|
||||
version=`mysql -u $3 --password=$4 $5 -e "select * from migrate_version \G" | grep version | sed 's/.*: //'`
|
||||
echo "Final schema version is $version"
|
||||
|
||||
# Cleanup virtual env
|
||||
set +x
|
||||
echo "Cleaning up virtual env"
|
||||
deactivate
|
||||
rmvirtualenv $1
|
||||
echo "done" > /srv/logs/$1
|
30
testing.log
Normal file
30
testing.log
Normal file
@ -0,0 +1,30 @@
|
||||
2013-07-25 16:21:35,001 [stdout] PING localhost (127.0.0.1) 56(84) bytes of data.
|
||||
2013-07-25 16:21:35,001 [stdout] 64 bytes from localhost (127.0.0.1): icmp_req=1 ttl=64 time=0.017 ms
|
||||
2013-07-25 16:21:36,000 [stdout] 64 bytes from localhost (127.0.0.1): icmp_req=2 ttl=64 time=0.018 ms
|
||||
2013-07-25 16:21:37,000 [stdout] 64 bytes from localhost (127.0.0.1): icmp_req=3 ttl=64 time=0.019 ms
|
||||
2013-07-25 16:21:38,000 [stdout] 64 bytes from localhost (127.0.0.1): icmp_req=4 ttl=64 time=0.022 ms
|
||||
2013-07-25 16:21:38,000 [stdout] --- localhost ping statistics ---
|
||||
2013-07-25 16:21:38,000 [stdout] 4 packets transmitted, 4 received, 0% packet loss, time 2998ms
|
||||
2013-07-25 16:21:38,000 [stdout] rtt min/avg/max/mdev = 0.017/0.019/0.022/0.002 ms
|
||||
2013-07-25 16:22:41,579 [stdout] PING localhost (127.0.0.1) 56(84) bytes of data.
|
||||
2013-07-25 16:22:41,579 [stdout] 64 bytes from localhost (127.0.0.1): icmp_req=1 ttl=64 time=0.018 ms
|
||||
2013-07-25 16:22:42,578 [stdout] 64 bytes from localhost (127.0.0.1): icmp_req=2 ttl=64 time=0.015 ms
|
||||
2013-07-25 16:22:43,577 [stdout] 64 bytes from localhost (127.0.0.1): icmp_req=3 ttl=64 time=0.026 ms
|
||||
2013-07-25 16:22:44,577 [stdout] 64 bytes from localhost (127.0.0.1): icmp_req=4 ttl=64 time=0.026 ms
|
||||
2013-07-25 16:22:44,577 [stdout] --- localhost ping statistics ---
|
||||
2013-07-25 16:22:44,577 [stdout] 4 packets transmitted, 4 received, 0% packet loss, time 2998ms
|
||||
2013-07-25 16:22:44,577 [stdout] rtt min/avg/max/mdev = 0.015/0.021/0.026/0.005 ms
|
||||
2013-07-25 16:23:14,577 [heartbeat]
|
||||
2013-07-25 16:25:04,155 [stderr] Usage: ping [-LRUbdfnqrvVaAD] [-c count] [-i interval] [-w deadline]
|
||||
2013-07-25 16:25:04,155 [stderr] [-p pattern] [-s packetsize] [-t ttl] [-I interface]
|
||||
2013-07-25 16:25:04,155 [stderr] [-M pmtudisc-hint] [-m mark] [-S sndbuf]
|
||||
2013-07-25 16:25:04,155 [stderr] [-T tstamp-options] [-Q tos] [hop1 ...] destination
|
||||
2013-07-25 16:38:37,987 [stderr] Usage: ping [-LRUbdfnqrvVaAD] [-c count] [-i interval] [-w deadline]
|
||||
2013-07-25 16:38:37,987 [stderr] [-p pattern] [-s packetsize] [-t ttl] [-I interface]
|
||||
2013-07-25 16:38:37,987 [stderr] [-M pmtudisc-hint] [-m mark] [-S sndbuf]
|
||||
2013-07-25 16:38:37,987 [stderr] [-T tstamp-options] [-Q tos] [hop1 ...] destination
|
||||
2013-07-25 16:43:17,558 [stderr] Usage: ping [-LRUbdfnqrvVaAD] [-c count] [-i interval] [-w deadline]
|
||||
2013-07-25 16:43:17,558 [stderr] [-p pattern] [-s packetsize] [-t ttl] [-I interface]
|
||||
2013-07-25 16:43:17,558 [stderr] [-M pmtudisc-hint] [-m mark] [-S sndbuf]
|
||||
2013-07-25 16:43:17,558 [stderr] [-T tstamp-options] [-Q tos] [hop1 ...] destination
|
||||
2013-07-25 16:43:17,558 [script exit code = 2]
|
69
worker_manager.py
Normal file
69
worker_manager.py
Normal file
@ -0,0 +1,69 @@
|
||||
# Copyright ...
|
||||
|
||||
import gear
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
|
||||
|
||||
class GearmanManager(threading.Thread):
|
||||
|
||||
""" This thread manages all of the launched gearman workers.
|
||||
As required by the zuul protocol it handles stopping builds when they
|
||||
are cancelled through stop:rcbau-ci-manager-%hostname.
|
||||
To do this it implements its own gearman worker waiting for events on
|
||||
that manager. """
|
||||
|
||||
log = logging.getLogger("rcbau-ci.worker_manager.Manager")
|
||||
|
||||
def __init__(self, config, tasks):
|
||||
super(GearmanManager, self).__init__()
|
||||
self._stop = threading.Event()
|
||||
self.config = config
|
||||
self.tasks = tasks
|
||||
|
||||
self.gearman_worker = None
|
||||
self.setup_gearman()
|
||||
|
||||
def setup_gearman(self):
|
||||
hostname = os.uname()[1]
|
||||
self.gearman_worker = gear.Worker('rcbau-manager-%s'
|
||||
% hostname)
|
||||
self.gearman_worker.addServer(
|
||||
self.config['zuul_server']['gearman_host'],
|
||||
self.config['zuul_server']['gearman_port']
|
||||
)
|
||||
self.gearman_worker.registerFunction(
|
||||
'stop:rcbau-ci-manager-%s' % hostname)
|
||||
|
||||
def stop(self):
|
||||
self._stop.set()
|
||||
# Unblock gearman
|
||||
self.log.debug("Telling gearman to stop waiting for jobs")
|
||||
self.gearman_worker.stopWaitingForJobs()
|
||||
self.gearman_worker.shutdown()
|
||||
|
||||
def stopped(self):
|
||||
return self._stop.isSet()
|
||||
|
||||
def run(self):
|
||||
while True and not self.stopped():
|
||||
try:
|
||||
# gearman_worker.getJob() blocks until a job is available
|
||||
logging.debug("Waiting for job")
|
||||
self.current_step = 0
|
||||
job = self.gearman_worker.getJob()
|
||||
self._handle_job(job)
|
||||
except:
|
||||
logging.exception('Exception retrieving log event.')
|
||||
|
||||
def _handle_job(self, job):
|
||||
""" Handle the requested job """
|
||||
try:
|
||||
job_arguments = json.loads(job.arguments.decode('utf-8'))
|
||||
self.tasks[job_arguments['name']].stop_worker(
|
||||
job_arguments['number'])
|
||||
except Exception as e:
|
||||
self.log.exception('Exception handling log event.')
|
||||
job.sendWorkException(str(e).encode('utf-8'))
|
115
worker_server.py
Executable file
115
worker_server.py
Executable file
@ -0,0 +1,115 @@
|
||||
#!/usr/bin/python2
|
||||
#
|
||||
# Copyright 2013 ...
|
||||
|
||||
import argparse
|
||||
import daemon
|
||||
import extras
|
||||
import imp
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
|
||||
import worker_manager
|
||||
|
||||
# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore
|
||||
# instead it depends on lockfile-0.9.1 which uses pidfile.
|
||||
pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
|
||||
|
||||
|
||||
class Server(object):
|
||||
|
||||
""" This is the worker server object to be daemonized """
|
||||
log = logging.getLogger("rcbau-ci.Server")
|
||||
|
||||
def __init__(self, config):
|
||||
# Config init
|
||||
self.config = config
|
||||
self.manager = None
|
||||
self.plugins = []
|
||||
self.load_plugins()
|
||||
|
||||
# Python logging output file.
|
||||
self.debug_log = self.config['debug_log']
|
||||
|
||||
self.tasks = {}
|
||||
|
||||
def setup_logging(self):
|
||||
if self.debug_log:
|
||||
logging.basicConfig(format='%(asctime)s %(message)s',
|
||||
filename=self.debug_log, level=logging.DEBUG)
|
||||
else:
|
||||
logging.basicConfig(format='%(asctime)s %(message)s',
|
||||
level=logging.WARN)
|
||||
self.log.debug('Log pusher starting.')
|
||||
|
||||
def load_plugins(self):
|
||||
""" Load the available plugins from task_plugins """
|
||||
# Load plugins
|
||||
for ent in os.listdir('task_plugins'):
|
||||
if (os.path.isdir('task_plugins/' + ent)
|
||||
and os.path.isfile('task_plugins/' + ent + '/task.py')):
|
||||
plugin_info = imp.find_module('task', ['task_plugins/' + ent])
|
||||
self.plugins.append(imp.load_module('task', *plugin_info))
|
||||
|
||||
def run_tasks(self):
|
||||
""" Run the tasks """
|
||||
for plugin in self.plugins:
|
||||
self.tasks[plugin.__worker_name__] = plugin.Runner(self.config)
|
||||
self.tasks[plugin.__worker_name__].daemon = True
|
||||
self.tasks[plugin.__worker_name__].start()
|
||||
|
||||
self.manager = worker_manager.GearmanManager(self.config, self.tasks)
|
||||
self.manager.daemon = True
|
||||
self.manager.start()
|
||||
|
||||
def exit_handler(self, signum):
|
||||
signal.signal(signal.SIGUSR1, signal.SIG_IGN)
|
||||
for task_name, task in self.tasks.items():
|
||||
task.stop()
|
||||
self.manager.stop()
|
||||
sys.exit(0)
|
||||
|
||||
def main(self):
|
||||
self.setup_logging()
|
||||
self.run_tasks()
|
||||
|
||||
while True:
|
||||
try:
|
||||
signal.pause()
|
||||
except KeyboardInterrupt:
|
||||
print "Ctrl + C: asking tasks to exit nicely...\n"
|
||||
self.exit_handler(signal.SIGINT)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('-c', '--config',
|
||||
default=
|
||||
'/etc/rcbau-ci/sql-migrate-gearman-worker.json',
|
||||
help='Path to json config file.')
|
||||
parser.add_argument('--foreground', action='store_true',
|
||||
help='Run in the foreground.')
|
||||
parser.add_argument('-p', '--pidfile',
|
||||
default='/var/run/rcbau-ci/'
|
||||
'sql-migrate-gearman-worker.pid',
|
||||
help='PID file to lock during daemonization.')
|
||||
args = parser.parse_args()
|
||||
|
||||
with open(args.config, 'r') as config_stream:
|
||||
config = json.load(config_stream)
|
||||
|
||||
server = Server(config)
|
||||
|
||||
if args.foreground:
|
||||
server.main()
|
||||
else:
|
||||
pidfile = pid_file_module.TimeoutPIDLockFile(args.pidfile, 10)
|
||||
with daemon.DaemonContext(pidfile=pidfile):
|
||||
server.main()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Loading…
x
Reference in New Issue
Block a user